4. 데이터 다루기 -3(ES Client 라이브러리)
4.5 서비스 코드에서 엘라스틱서치 클라이언트 이용
- 지금까지 배웠던 REST API를 호출해서 ES에 작업을 요청할수도 있겠으나, 이런 방식보다는 ES에서 공식적으로 제공하는 Client 라이브러리를 활용하면 좀 더 간편하게 코드를 작성할 수 있고, 유지보수도 더 쉽다.
- Java, Javascript, Python, Go, .NET 등 다양한 언어에서 사용할수 있는 클라이언트 라이브러리를 제공함.
JVM에서 지원하는 클라이언트
transport 클라이언트
- Deprecated된지 오래됨.
- 8버전부터는 완전히 제거됨.
저수준 Rest Client
- 모든 버전 호환되나, 단순 httpClient 수준으로만 제공됨.
고수준 Rest Client
- 7.15 버전부터 지원 중단 선언되었고, 자바 클라이언트로 전환이 예고되었다.
자바 클라이언트
- 개발 초기 단계이고, 아직 지원하지 않는 기능이 많으나 최신버전의 ES로 신규 구축한다면 사용을 권장한다.
4.5.1 저수준 Rest Client
- HttpComponents를 이용해서 HTTP로 통신한다.
- 요청을 ES API에 맞게 만들고 응답을 역직렬화하는 등의 작업은 클라이언트 사용자가 직접 해야 한다.
- 직접 만들어 호출하기 때문에 모든 ES버전과 호환된다.(실질적으로 api interface에 맞게 다 직접 만드는것이라 호환된다고 보기는 어렵다)
저수준 RestClient Config
1
2
3
4
5
6
dependencies {
val esVersion = "8.11.1"
// es 저수준 rest client
implementation("org.elasticsearch.client:elasticsearch-rest-client:$esVersion")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
class EsConfig {
companion object {
private const val ES_CONNECTION_TIMEOUT = 5000 // 5s
private const val ES_SOCKET_TIMEOUT = 5000 // 5s
}
@Bean("lowLevelEsRestClient")
fun lowLevelEsRestClient(): RestClient {
return RestClient
.builder(
HttpHost("hosts01", 9200, "http"),
HttpHost("hosts02", 9200, "http"),
HttpHost("hosts03", 9200, "http"),
)
.setRequestConfigCallback {
it.setConnectTimeout(ES_CONNECTION_TIMEOUT)
it.setSocketTimeout(ES_SOCKET_TIMEOUT)
}
.build()
}
}
저수준 RestClient 사용
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Service
class LowLevelEsService(
private val lowLevelEsRestClient: RestClient,
) {
companion object : KLogging()
fun getClusterSettings(): String {
val req = Request("GET", "/_cluster/settings")
req.addParameters(
mapOf(
"pretty" to "true",
),
)
logger.debug { "[ES TEST] start request" }
val res = lowLevelEsRestClient.performRequest(req)
logger.debug { "[ES TEST] response : $res" }
return getBody(res)
}
fun asyncUpdateSetting() {
val req = Request("PUT", "/_cluster/settings")
val requestBody = """
{
"transient": {
"cluster.routing.allocation.enable": "all"
}
}
""".trimIndent()
req.entity = NStringEntity(requestBody, ContentType.APPLICATION_JSON)
logger.debug { "[ES TEST] start request" }
val cancellable = lowLevelEsRestClient.performRequestAsync(
req,
object : ResponseListener {
override fun onSuccess(response: Response) {
logger.debug { "[ES TEST] response : $response" }
}
override fun onFailure(ex: Exception) {
logger.error("[ES TEST] es error", ex)
}
},
)
logger.debug { "[ES TEST] thread sleep" }
Thread.sleep(1000L)
cancellable.cancel()
}
private fun getBody(response: Response): String {
val statusCode = response.statusLine.statusCode
val responseBody = EntityUtils.toString(response.entity, StandardCharsets.UTF_8)
logger.debug { "[ES TEST] statusCode : $statusCode, body : $responseBody" }
return responseBody
}
}
- 위 예제에서 동기식 / 비동기식으로 API를 호출해보는것을 테스트했다.
- 비동기식인 경우 전달받은 Cancellable 객체의 cancel()을 통해 요청을 취소할수 있다.
- 저수준 RestClient는 thread-safe하므로 Spring 환경에서 위처럼 bean으로 한번 등록해두고 재사용할 수 있다.
- 저수준 RestClient는 단순 httpClient을 쓰는것과 차이가 없다. 즉 ES 기능을 사용하려면 Document를 보면서 API 스펙에 맞게 직접 구현해줘야해서 불편할수 있다.
4.5.2 고수준 Rest Client
- 고수준 RestClient는 ES API를 라이브러이의 API로 노출한다. 아예 클라이언트 라이브러리가 ES에 딱 맞게 설계되어있다.
- 다만, 위와 같은 특징때문에 ES 버전과 강하게 결합되어 있어 버전 호환성 이슈가 존재한다.
- 사용하는 ES 클러스터의 버전과 클라이언트 라이브러리 버전을 맞춰야만 호환성 이슈로부터 안전하다.
고수준 Rest Client 버전 관련 정보
- ES 7.15 버전부터 고수준 RestClient 지원 중단 선언됨.(JavaClient로 대체 필요)
- ES5 ~ 7.15 사이의 버전을 사용한다면 고수준 RestClient을 사용하면 된다.
- ES8 이상의 버전을 싸용한다면 자바클라이언트를 사용하는것이 좋다.
고수준 Rest Client Config
1
2
3
4
5
6
7
8
9
dependencies {
val esHighLevelClientVersion = "7.17.16"
// es 고수준 rest client
implementation("org.elasticsearch.client:elasticsearch-rest-high-level-client:$esHighLevelClientVersion")
// 수동 jar 설치 후 적용하는 방식
// implementation(fileTree(mapOf("dir" to "manual-build", "includes" to listOf("*.jar"))))
// implementation("org.elasticsearch:elasticsearch:$esVersion")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Configuration
class EsConfig {
companion object {
private const val ES_CONNECTION_TIMEOUT = 5000 // 5s
private const val ES_SOCKET_TIMEOUT = 5000 // 5s
}
@Bean("highLevelEsRestClient")
fun highLevelEsRestClient(): RestHighLevelClient {
val builder = createBaseRestClientBuilder()
return RestHighLevelClientBuilder(builder.build())
// NOTE es 8 이상일 경우 서버 호환을 위해 true로 설정
.setApiCompatibilityMode(true)
.build()
}
@Bean
fun bulkProcessor(
@Qualifier("highLevelEsRestClient") highLevelEsRestClient: RestHighLevelClient,
): BulkProcessor {
val bulkAsync = {
request: BulkRequest, listener: ActionListener<BulkResponse> ->
highLevelEsRestClient.bulkAsync(request, RequestOptions.DEFAULT, listener)
Unit
}
return BulkProcessor
.builder(bulkAsync, MyEsBulkListener(), "myBulkProcessor")
.setBulkActions(50000)
.setBulkSize(ByteSizeValue.ofMb(50L))
.setFlushInterval(TimeValue.timeValueMillis(5000L))
.setConcurrentRequests(1)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff())
.build()
}
private fun createBaseRestClientBuilder(): RestClientBuilder {
return RestClient
.builder(
HttpHost("localhost", 9200, "http"),
)
.setRequestConfigCallback {
it.setConnectTimeout(ES_CONNECTION_TIMEOUT)
it.setSocketTimeout(ES_SOCKET_TIMEOUT)
}
}
}
고수준 Rest Client 사용
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@Service
class HighLevelEsService(
private val highLevelEsRestClient: RestHighLevelClient,
private val bulkProcessor: BulkProcessor,
) {
companion object : KLogging()
fun getSample(): String {
val req = GetRequest()
.index("my-index-01")
.id("document-id-01")
.routing("abc123")
val res = highLevelEsRestClient.get(req, RequestOptions.DEFAULT)
logger.debug { "[ES_TEST] res : $res" }
return res.sourceAsString
}
fun searchSample(): List<MutableMap<String, Any>> {
val queryBuilder = QueryBuilders.boolQuery()
.must(TermQueryBuilder("filedOne", "hello"))
.should(MatchQueryBuilder("filedTwo", "hello world").operator(Operator.AND))
.should(RangeQueryBuilder("filedThree").gte(100).lt(200))
.minimumShouldMatch(1)
val searchSourceBuilder = SearchSourceBuilder()
.from(0)
.size(10)
.query(queryBuilder)
val searchRequest = SearchRequest()
.indices("my-index-01", "my-index-02")
.routing("abc123")
.source(searchSourceBuilder)
val res = highLevelEsRestClient.search(searchRequest, RequestOptions.DEFAULT)
logger.debug { "[ES_TEST] res : $res" }
val searchHits = res.hits
val totalHits = searchHits.totalHits
logger.debug { "[ES_TEST] totalHits : $totalHits" }
return searchHits
.hits
.map { it.sourceAsMap }
}
fun bulk() {
val bulkRequest = BulkRequest()
bulkRequest.add(
UpdateRequest()
.index("my-index-01")
.id("document-id-01")
.routing("abc123")
.doc(mapOf("hello" to "elasticsearch")),
)
val bulkResponse = highLevelEsRestClient.bulk(bulkRequest, RequestOptions.DEFAULT)
if (bulkResponse.hasFailures()) {
logger.error("[ES_TEST] ${bulkResponse.buildFailureMessage()}")
}
bulkResponse.items
logger.debug { "[ES_TEST] bulkResponse : ${ToStringBuilder.reflectionToString(bulkResponse)} " }
}
fun bulkProcessor(id: String) {
val source = mapOf<String, Any>(
"hello" to "world",
"world" to 123,
"name" to "name-$id",
)
val indexRequest = IndexRequest("my-index-01")
.id(id)
.routing("abc123")
.source(source, XContentType.JSON)
bulkProcessor.add(indexRequest)
}
}
- ES Rest API를 호출할때 body로 전달했던 내용들을 Builder Pattern을 제공해준다.
- 저수준 RestClient 보다 훨씬 직관적이고, 추상화된 API를 제공하고 있다.
bulk API 호출하기
- 위에 구현한
HighLevelEsService
의 내용을 참고하면 BulkRequest 라는 클래스를 통해 bulk 처리를 하는 방법과 BulkProcessor을 통해 처리하는 2가지 방법을 제공한다.
BulkProcessor 관련
- BulkProcessor는 필요한 bulk 처리에 대해서 add하게 되면 BulkProssor가 지정된 기준에 맞춰 bulk 요청을 만들어서 보낸다.
- 이런 과정을 BulkProcessor가 flush한다고 표현한다.
BulkProcessor thread-safety 관련
- 개발/테스트하는 과정에서 BulkProcessor을 Spring bean으로 등록하고 사용할수 있는지에 대해서 확인(thread-safety)
- 핵심적인 내용은 아래 코드를 참고할수 있는데, bulk 대상을 add할때 lock을 활용하고 있는것으로 보임.
- 그래서 thread-safety를 보장할수 있을것으로 보임
- 다만, bulkProcessor 특성상 bulk 처리를 내부에서 알아서 처리하므로, 실시간으로 사용자의 응답으로 서빙하는 케이스에서는 사용을 지양하는것이 맞지 않을까 생각됨.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request
* (either based on number of actions, based on the size, or time), and to easily control the number of concurrent bulk
* requests allowed to be executed in parallel.
* <p>
* In order to create a new bulk processor, use the {@link Builder}.
*/
public class BulkProcessor implements Closeable {
// ...(중략)
private void internalAdd(DocWriteRequest<?> request) {
// bulkRequest and instance swapping is not threadsafe, so execute the mutations under a lock.
// once the bulk request is ready to be shipped swap the instance reference unlock and send the local reference to the handler.
Tuple<BulkRequest, Long> bulkRequestToExecute = null;
lock.lock();
try {
ensureOpen();
bulkRequest.add(request);
bulkRequestToExecute = newBulkRequestIfNeeded();
} finally {
lock.unlock();
}
// execute sending the local reference outside the lock to allow handler to control the concurrency via it's configuration.
if (bulkRequestToExecute != null) {
execute(bulkRequestToExecute.v1(), bulkRequestToExecute.v2());
}
}
}
BulkProcessor.Listener
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class MyEsBulkListener : BulkProcessor.Listener {
companion object : KLogging()
override fun beforeBulk(executionId: Long, request: BulkRequest) {
logger.debug { "[ES_TEST] before bulk" }
}
override fun afterBulk(executionId: Long, request: BulkRequest, response: BulkResponse) {
if (!response.hasFailures()) {
logger.debug { "[ES_TEST] bulk success" }
} else {
logger.error { "[ES_TEST] bulk failures" }
val failedItems = response.items
.filter { it.isFailed }
logger.error { "[ES_TEST] failed items : ${ToStringBuilder.reflectionToString(failedItems)}" }
}
}
override fun afterBulk(executionId: Long, request: BulkRequest?, failure: Throwable?) {
logger.debug { "[ES_TEST] after bulk" }
}
}
- BulkProcessor에 Listner를 등록하여 처리 과정 중에 전처리/후처리 등을 직접 구현하여 적용할 수 있다.
ES 8버전에서 고수준 RestClient 사용하기
- ES에서 고수준 RestClient는 지원 중단을 선언하였고, 더이상 maven repo를 통해 신규 버전이 배포되지 않고 있다.
- 다만, 직접 es code를 checkout 받아서 수동으로 build하면 ES8 버전의 jar을 사용할수도 있다.
1
2
3
4
5
6
# checkout
git clone https://github.com/elastic/elasticsearch.git
git checkout tags/v8.11.1
# build
./gradleew clean :client:rest-high-level:build -x test
- 위 빌드 결과물인
build/distributions/elasticsearch-rest-high-level-client-8.11.1-SNAPSHOT.jar
을 프로젝트에 포함시키면 된다.
1
2
3
4
5
6
7
dependencies {
val esVersion = "8.11.1"
// 수동 jar 설치 후 적용하는 방식
implementation(fileTree(mapOf("dir" to "manual-build", "includes" to listOf("*.jar"))))
implementation("org.elasticsearch:elasticsearch:$esVersion")
}
- 다만 , 위 방식으로 테스트해보니 elasticsearch 서버 라이브러리를 통쨰로 넣어야 동작을 하는데, IDE에서 많이 힘들어해서 이 방식은 좋은 방법이 아닌것 같다.
- 따라서, 그냥 JavaClient을 사용하는것이 현명해보인다.
4.5.3 자바 클라이언트
- ES 7.15버전부터 출시되었고, 기존의 고수준 restClient을 대체하게 된 클라이언트이다.(2021.9월 최초 출시)
- 8 이상을 사용한다면 고수준 restClient와 혼용하더라도 점진적으로 새 자바 클라이언트를 도입하는 것이 좋다.
- 아직은 초기라서 고수준 restClient에 비해 불편함이 있을수 있는데 이 부분을 감안해서 선택하는것이 좋다.
자바 클라이언트의 초기화
1
2
3
4
5
6
7
8
9
10
11
12
dependencies {
val esVersion = "8.11.1"
val esHighLevelClientVersion = "7.17.16"
// es 저수준 rest client
implementation("org.elasticsearch.client:elasticsearch-rest-client:$esVersion")
// es 고수준 rest client
implementation("org.elasticsearch.client:elasticsearch-rest-high-level-client:$esHighLevelClientVersion")
// java client
implementation("co.elastic.clients:elasticsearch-java:$esVersion")
}
- 위와 같이
java client
디펜던시 설정을 추가해준다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Configuration
class EsConfig(
private val objectMapper: ObjectMapper,
) {
companion object {
private const val ES_CONNECTION_TIMEOUT = 5000 // 5s
private const val ES_SOCKET_TIMEOUT = 5000 // 5s
private const val CLIENT_BUFFER_SIZE = 500 * 1024 * 1024 // 500MB
}
@Bean("lowLevelEsRestClient")
fun lowLevelEsRestClient(): RestClient {
return createBaseRestClientBuilder()
.build()
}
@Bean("esJavaClient")
fun esJavaClient(
@Qualifier("lowLevelEsRestClient") lowLevelEsRestClient: RestClient,
): ElasticsearchClient {
val restClientOptions = RestClientOptions(
RequestOptions.DEFAULT
.toBuilder()
.setHttpAsyncResponseConsumerFactory(
HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(CLIENT_BUFFER_SIZE),
)
.build(),
)
val transport = RestClientTransport(lowLevelEsRestClient, JacksonJsonpMapper(objectMapper), restClientOptions)
return ElasticsearchClient(transport)
}
@Bean
fun bulkIngester(
@Qualifier("esJavaClient") esJavaClient: ElasticsearchClient,
): BulkIngester<String> {
val listener = StringBulkIngestListener<String>()
return BulkIngester.of {
it.client(esJavaClient)
.maxOperations(200)
.maxConcurrentRequests(1)
.maxSize(5 * 1024 * 1024) // 5MB
.flushInterval(5L, TimeUnit.SECONDS)
.listener(listener)
}
}
@Bean("highLevelEsRestClient")
fun highLevelEsRestClient(
@Qualifier("lowLevelEsRestClient") lowLevelEsRestClient: RestClient,
): RestHighLevelClient {
return RestHighLevelClientBuilder(lowLevelEsRestClient)
// NOTE es 8 이상일 경우 서버 호환을 위해 true로 설정
.setApiCompatibilityMode(true)
.build()
}
private fun createBaseRestClientBuilder(): RestClientBuilder {
return RestClient
.builder(
HttpHost("localhost", 9200, "http"),
)
.setRequestConfigCallback {
it.setConnectTimeout(ES_CONNECTION_TIMEOUT)
it.setSocketTimeout(ES_SOCKET_TIMEOUT)
}
}
}
- spring application configuration은 위와 같이 설정할 수 있다.
- 위 예시는 책 예제를 기반으로 작성한것으로 고수준 RestClient와 JavaClient 모두를 사용하는 방식으로 작성되어있는데 상황에 따라 적절히 선택하면 된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@SpringBootApplication(
scanBasePackages = [
"com.starter.es",
"com.starter.core.common",
"com.starter.core.jasypt",
"com.starter.core.security",
],
exclude = [
DataSourceAutoConfiguration::class,
DataSourceTransactionManagerAutoConfiguration::class,
HibernateJpaAutoConfiguration::class,
ElasticsearchClientAutoConfiguration::class,
],
)
class EsApplication
fun main(args: Array<String>) {
runApplication<EsApplication>(*args)
}
- application 설정시
ElasticsearchClientAutoConfiguration::class
을 exclude를 해주어야 한다. - spring auto configuration을 사용한다면 위 설정 말고도 적절히 라이브러리 디펜던시 추가가 필요하고, bean 설정도 변경이 필요할것이다.
- 개인적으로는 auto configuration보다는 직접 bean 정의를 하는것이 명확해서 좋은 것 같음.
자바 클라이언트 사용하기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
@Service
class EsJavaClientService(
private val elasticsearchClient: ElasticsearchClient,
private val bulkIngester: BulkIngester<String>,
) {
companion object : KLogging() {
private const val INDEX_NAME = "my-index"
}
fun indexExample(): String {
val indexRequest = IndexRequest.Builder<MyIndexClass>()
.index(INDEX_NAME)
.id("my-id-1")
.routing("my-routing-1")
.document(MyIndexClass("hello", 1L, createNow()))
.build()
val response = elasticsearchClient.index(indexRequest)
val result = response.result()
logger.debug { "[ES_TEST] response : $response , resultName : ${result.name}" }
return response.toString()
}
fun getSample(id: String): MyIndexClass? {
val getRequest = GetRequest.Builder()
.index(INDEX_NAME)
.id(id)
.routing("my-routing-1")
.build()
val response = elasticsearchClient.get(getRequest, MyIndexClass::class.java)
val result = response.source()
logger.debug { "[ES_TEST] response : $response , result : $result" }
return result
}
fun bulk() {
val createOperation = CreateOperation.Builder<MyIndexClass>()
.index(INDEX_NAME)
.id("my-id-2")
.routing("my-routing-2")
.document(MyIndexClass("world", 2L, createNow()))
.build()
val indexOperation = IndexOperation.Builder<MyIndexClass>()
.index(INDEX_NAME)
.id("my-id-3")
.routing("my-routing-3")
.document(MyIndexClass("world", 4L, createNow()))
.build()
val updateAction = UpdateAction.Builder<MyIndexClass, MyPartialIndexClass>()
.doc(MyPartialIndexClass("world updated"))
.build()
val updateOperation = UpdateOperation.Builder<MyIndexClass, MyPartialIndexClass>()
.index(INDEX_NAME)
.id("my-id-1")
.routing("my-routing-1")
.action(updateAction)
.build()
val bulkOpOne = BulkOperation.Builder().create(createOperation).build()
val bulkOpTwo = BulkOperation.Builder().index(indexOperation).build()
val bulkOpThree = BulkOperation.Builder().update(updateOperation).build()
val operations = listOf<BulkOperation>(bulkOpOne, bulkOpTwo, bulkOpThree)
val bulkResponse = elasticsearchClient.bulk { it.operations(operations) }
for (item in bulkResponse.items()) {
logger.debug { "[ES_TEST] results : ${item.result()}, error : ${item.error()}" }
}
}
fun search(fieldOneValue: String): List<MyIndexClass> {
val searchRequest = SearchRequest.Builder()
.index(INDEX_NAME)
.from(0)
.size(10)
.query { q ->
q.term { t ->
t.field("fieldOne")
.value { v -> v.stringValue(fieldOneValue) }
}
}
.build()
val response = elasticsearchClient.search(searchRequest, MyIndexClass::class.java)
val hits = response.hits().hits()
for (item in hits) {
logger.debug { "[ES_TEST] source : ${item.source()}" }
}
return hits.mapNotNull { it.source() }
}
fun bulkWithIngester() {
val startDatetime = LocalDateTime.now()
val epochSecond = startDatetime.toEpochSecond(ZoneOffset.UTC)
logger.debug { "[ES_TEST] startEpochSecond : $epochSecond" }
for (number in 0L..1000L) {
val bulkOperation = BulkOperation.of { builder ->
builder.index { indexOpBUilder ->
indexOpBUilder
.index(INDEX_NAME)
.id("my-id-$number")
.routing("my-routing-$number")
.document(MyIndexClass("world", concatNumber(epochSecond, number), createNow()))
}
}
bulkIngester.add(bulkOperation, "my-context-$number")
}
logger.debug { "[ES_TEST] [${LocalDateTime.now()}] sleep 10 seconds ..." }
Thread.sleep(10000L)
for (number in 1001L..1500L) {
val bulkOperation = BulkOperation.of { builder ->
builder.index { indexOpBUilder ->
indexOpBUilder
.index(INDEX_NAME)
.id("my-id-$number")
.routing("my-routing-$number")
.document(MyIndexClass("world", concatNumber(epochSecond, number), createNow()))
}
}
bulkIngester.add(bulkOperation, "my-context-$number")
}
logger.debug { "[ES_TEST] [${LocalDateTime.now()}] sleep 10 seconds ..." }
Thread.sleep(10000L)
logger.debug { "[ES_TEST] It's completed." }
// bean이므로 굳이 닫지않음.
// bulkIngester.close()
}
private fun createNow() = ZonedDateTime.now(ZoneOffset.UTC)
private fun concatNumber(baseNumber: Long, number: Long): Long {
return String.format("%d%04d", baseNumber, number).toLong()
}
}
- 위 예시에는 단건문서 색인, 조회, bulk API 호출, search, bulkIngester 사용하는 예제까지 모두 포함되어있다.
JavaClient의 주요 특징
- 라이브러리 코드를 살펴보면 java 8부터 지원하는 Function을 잘 지원하고 있다.
- 책에서는 람다의 깊이를
_0, _1, _2 ...
와 같은 방식으로 표현하라고 되어있으나 이는 가독성에 좋은 방법이 아니라고 생각된다. - 이때 Function도 적당히 사용해야 가독성이 좋으니 가독성을 유지할수 있는 적당한 수준으로 작성하자.
- Function 사용 대신 실제 Builder 클래스를 build()하는 방식으로도 구현 가능하다.
- 책에서는 람다의 깊이를
- Operation을 처리할떄 Generic Type을 지원하는 것이 주된 특징이다.
- 기존 고수준 restClient에서는 이런 타입 핸들링이 불가능했던것으로 보이고, 이걸 역직렬화 처리를 Map 또는 JsonString으로부터 해줬어야 했으나 이런 부분이 개선된 부분인것 같다.
BulkIngester 사용 관련
- ES 8.7 버전부터 JavaClient에서 Bulk 처리를 위한 BulkIngester가 추가되었다.
- maxOperations, maxConcurrentRequests, maxSize, flushInterval 등의 builder 메소드를 사용해서 bulk API 처리 정책을 정할 수 있다.
- 고수준 RestClient의 BulkProcessor처럼 Listener을 지정하여 전/후처리에 대한 동작을 정의할 수 있다.
- BulkProcessor와 다르게 Generic을 지원한다.
- 그래서 인덱스 클래스 타입을 지정하여 특정 인덱스에 대한 타입 안정성을 가져갈수 있겠으나, 이런 경우 범용성은 떨어질수 있을것 같다.
- 필요에 따라서 판단해야겠지만 범용적인 하나의 BulkIngester을 사용해야 한다면 Generic Type을 String으로 지정하여 사용할 수 있을것 같다.
BulkIngester thread-safety
- BulkIngester도 개발/테스트 과정에서 확인해보니 thread-safety를 보장하여 spring bean으로 등록하고 사용하는것이 문제 없는것으로 확인했다.
- 코드를 확인해보니 add하기 전에 상태를 체크해서 동기화 처리를 하고 있음.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class BulkIngester<Context> implements AutoCloseable {
// ...(중략)
// Synchronization objects
private final ReentrantLock lock = new ReentrantLock();
private final FnCondition addCondition = new FnCondition(lock, this::canAddOperation);
private final FnCondition sendRequestCondition = new FnCondition(lock, this::canSendRequest);
private final FnCondition closeCondition = new FnCondition(lock, this::closedAndFlushed);
// ...(중략)
public void add(BulkOperation operation, Context context) {
if (isClosed) {
throw new IllegalStateException("Ingester has been closed");
}
IngesterOperation ingestOp = IngesterOperation.of(operation, client._jsonpMapper());
addCondition.whenReady(() -> {
// ...(중략)
}
}
class FnCondition {
// ...(중략)
public <T> T whenReadyIf(BooleanSupplier canRun, Supplier<T> fn) {
lock.lock();
try {
if (canRun != null && !canRun.getAsBoolean()) {
return null;
}
invocations++;
boolean firstLoop = true;
while (!ready.getAsBoolean()) {
if (firstLoop) {
contentions++;
firstLoop = false;
}
condition.awaitUninterruptibly();
}
if (canRun != null && !canRun.getAsBoolean()) {
return null;
}
return fn.get();
} finally {
lock.unlock();
}
}
}
Reference
- Github Sample Code
- 샘플코드는 위 Repo에서 확인할수 있다.
- BulkProcessor thread-safety 관련
- BulkIngester thread-safety 관련
Comments powered by Disqus.