我已經(jīng)實現(xiàn)了一項服務(wù),Quarkus作為主要框架,Elasticsearch作為數(shù)據(jù)存儲。在實現(xiàn)過程中,我萌生了寫一篇關(guān)于如何使用Elasticsearch 的 Java High Level REST Client以反應(yīng)式方式綁定 Quarkus 的想法。
開始對文章做筆記,將常用庫(otaibe-commons-quarkus-elasticsearch模塊)中常用的Elasticsearch相關(guān)代碼分離出來存放在Github中。然后,我花了幾個小時以 Quarkus 指南頁面中的方式組裝了一個簡單的示例項目(也在 Github 中) 。目前,那里缺少 Elasticsearch 指南。
讓我們繼續(xù)更詳細地解釋如何連接 Quarkus 和 Elasticsearch。
創(chuàng)建 Quarkus 項目
mvn io.quarkus:quarkus-maven-plugin:1.0.1.Final:create \
-DprojectGroupId=org.otaibe.quarkus.elasticsearch.example \
-DprojectArtifactId=otaibe-quarkus-elasticsearch-example \
-DclassName="org.otaibe.quarkus.elasticsearch.example.web.controller.FruitResource" \
-Dpath="/fruits" \
-Dextensions="resteasy-jackson,elasticsearch-rest-client"
Maven 設(shè)置
如您所見,Quarkus 中存在一個elasticsearch -rest-client ;然而,這是一個 Elasticsearch Java 低級 REST 客戶端。如果我們想使用 Elasticsearch Java High Level REST Client,我們只需要將它作為依賴添加到pom.xml文件中:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.4.0</version>
</dependency>
請確保 Elasticsearch Java Low Level REST Client 的版本與 Elasticsearch Java High Level REST Client匹配。
由于我們以響應(yīng)式方式使用 Elasticsearch,因此我更喜歡使用 Project Reactor。我們必須在依賴管理部分添加 BOM:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Dysprosium-SR2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
我們還必須添加 reactor-core 作為依賴項:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
我已經(jīng)在一個庫中分離了公共代碼,所以我們應(yīng)該將這個庫添加到我們的示例項目中。為此,我們將使用Jitpack。這是一項很棒的服務(wù)。你只需要為 你的
Github 項目指出 正確的方法,它就會為它構(gòu)建一個工件。這是我使用它的方式:
<dependency>
<groupId>com.github.tpenakov.otaibe-commons-quarkus</groupId>
<artifactId>otaibe-commons-quarkus-core</artifactId>
<version>elasticsearch-example.02</version>
</dependency>
<dependency>
<groupId>com.github.tpenakov.otaibe-commons-quarkus</groupId>
<artifactId>otaibe-commons-quarkus-elasticsearch</artifactId>
<version>elasticsearch-example.02</version>
</dependency>
<dependency>
<groupId>com.github.tpenakov.otaibe-commons-quarkus</groupId>
<artifactId>otaibe-commons-quarkus-rest</artifactId>
<version>elasticsearch-example.02</version>
</dependency>
通過 Docker 啟動 Elasticsearch
此外,我們應(yīng)該啟動 Elastisearch。最簡單的方法是通過 Docker 運行它:
docker run -it --rm=true --name elasticsearch_quarkus_test \
-p 11027:9200 -p 11028:9300 \
-e "discovery.type=single-node" \
docker.elastic.co/elasticsearch/elasticsearch:7.4.0
連接到 Elasticsearch
讓我們從將我們的服務(wù)連接到 Elasticsearch 開始——示例項目中的實現(xiàn)很簡單——因此它將偵聽 Quarkus 啟動和關(guān)閉事件并初始化或終止連接:
package org.otaibe.quarkus.elasticsearch.example.service;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.otaibe.commons.quarkus.elasticsearch.client.service.AbstractElasticsearchService;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
@ApplicationScoped
@Getter
@Setter
@Slf4j
public class ElasticsearchService extends AbstractElasticsearchService {
public void init(@Observes StartupEvent event) {
log.info("init started");
super.init();
log.info("init completed");
}
public void shutdown(@Observes ShutdownEvent event) {
log.info("shutdown started");
super.shutdown();
log.info("shutdown completed");
}
}
連接到 Elasticsearch 的實際工作是在AbstractElasticsearchService 中完成的:
public abstract class AbstractElasticsearchService {
@ConfigProperty(name = "service.elastic-search.hosts")
String[] hosts;
@ConfigProperty(name = "service.elastic-search.num-threads", defaultValue = "10")
Optional<Integer> numThreads;
private RestHighLevelClient restClient;
private Sniffer sniffer;
@PostConstruct
public void init() {
log.info("init started");
List<HttpHost> httpHosts = Arrays.stream(hosts)
.map(s -> StringUtils.split(s, ':'))
.map(strings -> new HttpHost(strings[0], Integer.valueOf(strings[1])))
.collect(Collectors.toList());
RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));
getNumThreads().ifPresent(integer ->
builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultIOReactorConfig(
IOReactorConfig
.custom()
.setIoThreadCount(integer)
.build())
));
restClient = new RestHighLevelClient(builder);
sniffer = Sniffer.builder(getRestClient().getLowLevelClient()).build();
log.info("init completed");
}
}
如您所見,此處的連接是按照Elasticsearch 文檔 中建議的方式完成的。我的實現(xiàn)取決于兩個配置屬性:
屬性文件:
service.elastic-search.hosts=localhost:11027
這是從 Docker 啟動后的 Elasticsearch 連接字符串。第二個可選屬性是:屬性文件
service.elastic-search.num-threads
這是客戶端所需的線程數(shù)。
創(chuàng)建 POJO
現(xiàn)在,讓我們創(chuàng)建域?qū)ο螅?a >Fruit):
package org.otaibe.quarkus.elasticsearch.example.domain;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
public class Fruit {
public static final String ID = "id";
public static final String EXT_REF_ID = "ext_ref_id";
public static final String NAME = "name";
public static final String DESCRIPTION = "description";
public static final String VERSION = "version";
@JsonProperty(ID)
public String id;
@JsonProperty(EXT_REF_ID)
public String extRefId;
@JsonProperty(NAME)
public String name;
@JsonProperty(DESCRIPTION)
public String description;
@JsonProperty(VERSION)
public Long version;
}
創(chuàng)建和實現(xiàn) DAO
創(chuàng)建索引
讓我們創(chuàng)建 FruitDaoImpl。它是一個高級類,用于填充 AbstractElasticsearchReactiveDaoImplementation 并實現(xiàn)所需的業(yè)務(wù)邏輯。這里的另一個重要部分是為 Fruit 類創(chuàng)建索引:
@Override
protected Mono<Boolean> createIndex() {
CreateIndexRequest request = new CreateIndexRequest(getTableName());
Map<String, Object> mapping = new HashMap();
Map<String, Object> propsMapping = new HashMap<>();
propsMapping.put(Fruit.ID, getKeywordTextAnalizer());
propsMapping.put(Fruit.EXT_REF_ID, getKeywordTextAnalizer());
propsMapping.put(Fruit.NAME, getTextAnalizer(ENGLISH));
propsMapping.put(Fruit.DESCRIPTION, getTextAnalizer(ENGLISH));
propsMapping.put(Fruit.VERSION, getLongFieldType());
mapping.put(PROPERTIES, propsMapping);
request.mapping(mapping);
return createIndex(request);
}
對 Elasticsearch 的真正創(chuàng)建索引調(diào)用是在父類 ( AbstractElasticsearchReactiveDaoImplementation )
中實現(xiàn)的:
protected Mono<Boolean> createIndex(CreateIndexRequest request) {
return Flux.<Boolean>create(fluxSink -> getRestClient().indices().createAsync(request, RequestOptions.DEFAULT, new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
log.info("CreateIndexResponse: {}", createIndexResponse);
fluxSink.next(createIndexResponse.isAcknowledged());
fluxSink.complete();
}
@Override
public void onFailure(Exception e) {
log.error("unable to create index", e);
fluxSink.error(new RuntimeException(e));
}
}))
.next();
}
玩轉(zhuǎn) DAO
大多數(shù) CRUD 操作在AbstractElasticsearchReactiveDaoImplementation中實現(xiàn) 。
它有 save、 update、 findById和 deleteById 公共方法。它也有findByExactMatch和 findByMatch保護方法。FindBy*當(dāng)需要填充業(yè)務(wù)邏輯時,這些 方法在后代類中非常有用。
業(yè)務(wù)查找方法在FruitDaoImpl 類中實現(xiàn) :
public Flux<Fruit> findByExternalRefId(String value) {
return findByMatch(Fruit.EXT_REF_ID, value);
}
public Flux<Fruit> findByName(String value) {
return findByMatch(Fruit.NAME, value);
}
public Flux<Fruit> findByDescription(String value) {
return findByMatch(Fruit.NAME, value);
}
public Flux<Fruit> findByNameOrDescription(String value) {
Map<String, Object> query = new HashMap<>();
query.put(Fruit.NAME, value);
query.put(Fruit.DESCRIPTION, value);
return findByMatch(query);
}
在Service類中封裝 DAO
FruitDaoImpl 封裝在 FruitService 中:
@ApplicationScoped
@Getter
@Setter
@Slf4j
public class FruitService {
@Inject
FruitDaoImpl dao;
public Mono<Fruit> save(Fruit entity) {
return getDao().save(entity);
}
public Mono<Fruit> findById(Fruit entity) {
return getDao().findById(entity);
}
public Mono<Fruit> findById(String id) {
return Mono.just(Fruit.of(id, null, null, null, null))
.flatMap(entity -> findById(entity));
}
public Flux<Fruit> findByExternalRefId(String value) {
return getDao().findByExternalRefId(value);
}
public Flux<Fruit> findByName(String value) {
return getDao().findByName(value);
}
public Flux<Fruit> findByDescription(String value) {
return getDao().findByDescription(value);
}
public Flux<Fruit> findByNameOrDescription(String value) {
return getDao().findByNameOrDescription(value);
}
public Mono<Boolean> delete(Fruit entity) {
return Mono.just(entity.getId())
.filter(s -> StringUtils.isNotBlank(s))
.flatMap(s -> getDao().deleteById(entity))
.defaultIfEmpty(false);
}
}
測試 FruitService
該 FruitServiceTests 寫入,以測試基本功能。它還用于確保 Fruit 類字段被正確索引并且全文搜索按預(yù)期工作:
@Test
public void manageFruitTest() {
Fruit apple = getTestUtils().createApple();
Fruit apple1 = getFruitService().save(apple).block();
Assertions.assertNotNull(apple1.getId());
Assertions.assertTrue(apple1.getVersion() > 0);
log.info("saved result: {}", getJsonUtils().toStringLazy(apple1));
List<Fruit> fruitList = getFruitService().findByExternalRefId(TestUtils.EXT_REF_ID).collectList().block();
Assertions.assertTrue(fruitList.size() > 0);
List<Fruit> fruitList1 = getFruitService().findByNameOrDescription("bulgaria").collectList().block();
Assertions.assertTrue(fruitList1.size() > 0);
//Ensure that the full text search is working - it is 'Apples' in description
List<Fruit> fruitList2 = getFruitService().findByDescription("apple").collectList().block();
Assertions.assertTrue(fruitList2.size() > 0);
//Ensure that the full text search is working - it is 'Apple' in name
List<Fruit> fruitList3 = getFruitService().findByName("apples").collectList().block();
Assertions.assertTrue(fruitList3.size() > 0);
Boolean deleteAppleResult = getFruitService().getDao().deleteById(apple1).block();
Assertions.assertTrue(deleteAppleResult);
}
添加 REST 端點
因為這是一個示例項目,完整的 CRUD 功能不會作為 REST 端點添加。只有save和 findById被添加為 REST 端點。它們被添加到 FruitResource 中。那里的方法返回 CompletionStage<Response>,這確保我們的應(yīng)用程序中不會有阻塞的線程。
測試 REST 端點
添加FruitResourceTest以測試 RESTendpoints:
package org.otaibe.quarkus.elasticsearch.example.web.controller;
import io.quarkus.test.junit.QuarkusTest;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.otaibe.commons.quarkus.core.utils.JsonUtils;
import org.otaibe.quarkus.elasticsearch.example.domain.Fruit;
import org.otaibe.quarkus.elasticsearch.example.service.FruitService;
import org.otaibe.quarkus.elasticsearch.example.utils.TestUtils;
import javax.inject.Inject;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import java.net.URI;
import java.util.Optional;
import static io.restassured.RestAssured.given;
@QuarkusTest
@Getter(value = AccessLevel.PROTECTED)
@Slf4j
public class FruitResourceTest {
@ConfigProperty(name = "service.http.host")
Optional<URI> host;
@Inject
TestUtils testUtils;
@Inject
JsonUtils jsonUtils;
@Inject
FruitService service;
@Test
public void restEndpointsTest() {
log.info("restEndpointsTest start");
Fruit apple = getTestUtils().createApple();
Fruit savedApple = given()
.when()
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
.body(apple)
.post(getUri(FruitResource.ROOT_PATH))
.then()
.statusCode(200)
.extract()
.as(Fruit.class);
String id = savedApple.getId();
Assertions.assertTrue(StringUtils.isNotBlank(id));
URI findByIdPath = UriBuilder.fromPath(FruitResource.ROOT_PATH)
.path(id)
.build();
Fruit foundApple = given()
.when().get(getUri(findByIdPath.getPath()).getPath())
.then()
.statusCode(200)
.extract()
.as(Fruit.class);
Assertions.assertEquals(savedApple, foundApple);
Boolean deleteResult = getService().delete(foundApple).block();
Assertions.assertTrue(deleteResult);
given()
.when().get(findByIdPath.getPath())
.then()
.statusCode(Response.Status.NOT_FOUND.getStatusCode()) ;
log.info("restEndpointsTest end");
}
private URI getUri(String path) {
return getUriBuilder(path)
.build();
}
private UriBuilder getUriBuilder(String path) {
return getHost()
.map(uri -> UriBuilder.fromUri(uri))
.map(uriBuilder -> uriBuilder.path(path))
.orElse(UriBuilder
.fromPath(path)
);
}
}
構(gòu)建本地可執(zhí)行文件
在構(gòu)建本機可執(zhí)行文件之前,我們必須注冊我們的 Fruit 域?qū)ο蟆_@樣做的原因是我們的 FruitResource 返回 CompletionStage<Response>,因此,應(yīng)用程序的實際返回類型是未知的,因此我們必須顯式注冊它以進行反射。在 Quarkus 中至少有兩種方法可以做到這一點:
- 通過 @RegisterForReflection 注釋。
- 通過 反射-config.json。
我個人更喜歡第二種方法,因為您要注冊的類可能在第三方庫中,并且不可能將 @RegisterForReflection 放在 那里。
現(xiàn)在, reflection-config.json 看起來像這樣:
[
{
"name" : "org.otaibe.quarkus.elasticsearch.example.domain.Fruit",
"allDeclaredConstructors" : true,
"allPublicConstructors" : true,
"allDeclaredMethods" : true,
"allPublicMethods" : true,
"allDeclaredFields" : true,
"allPublicFields" : true
}
]
下一步是讓 Quarkus 知道 reflection-config.json 文件。您應(yīng)該將此行添加到pom.xml文件中的 native
配置文件中:
<quarkus.native.additional-build-args>-H:ReflectionConfigurationFiles=${project.basedir}/src/main/resources/reflection-config.json</quarkus.native.additional-build-args>
您現(xiàn)在可以構(gòu)建您的本機應(yīng)用程序:
mvn clean package -Pnative
并啟動它:
./target/otaibe-quarkus-elasticsearch-example-1.0-SNAPSHOT-runner
該服務(wù)將在http://localhost:11025上可用,因為這是application.properties 中明確指定的端口。
quarkus.http.port=11025
測試本機構(gòu)建
該 FruitResourceTest 預(yù)計以下可選屬性:
屬性文件:
service.http.host
如果存在,測試請求將命中指定的主機。如果您啟動本機可執(zhí)行文件:
shell:
./target/otaibe-quarkus-elasticsearch-example-1.0-SNAPSHOT-runner
并使用以下代碼執(zhí)行測試/構(gòu)建:
shell:
mvn package -D %test.service.http .host = http://localhost:11025
測試將針對本機構(gòu)建運行。
結(jié)論
我驚喜地發(fā)現(xiàn) Elasticsearch 與 Quarkus 一起開箱即用,可以編譯為本地代碼,結(jié)合通過Project Reactor 的反應(yīng)式實現(xiàn) ,將使應(yīng)用程序的占用空間幾乎微不足道。