Skip to content

Commit

Permalink
fix: allow reindexing indexes while upgrading - EXO-64127
Browse files Browse the repository at this point in the history
  • Loading branch information
ahamdi committed Feb 12, 2024
1 parent 65fc1f9 commit 198e0f4
Show file tree
Hide file tree
Showing 10 changed files with 379 additions and 62 deletions.
2 changes: 1 addition & 1 deletion commons-search/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<packaging>jar</packaging>
<name>eXo PLF:: Commons - Commons Search</name>
<properties>
<exo.test.coverage.ratio>0.33</exo.test.coverage.ratio>
<exo.test.coverage.ratio>0.4</exo.test.coverage.ratio>
<org.hamcrest.version>1.3</org.hamcrest.version>
</properties>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ public enum OperationType {
INIT("I"),
CREATE("C"),
UPDATE("U"),
DELETE("D");
DELETE("D"),
DELETE_ALL("X");

private final String operationId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
public class ElasticIndexingAuditTrail {
public static final String REINDEX_ALL = "reindex_all";

public static final String DELETE_ALL = "delete_all";

public static final String CREATE_INDEX = "create_index";

public static final String DELETE_INDEX = "delete_index";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,23 @@ public void sendCUDRequest(String bulkRequest) {
logBulkResponse(response.getMessage(), (System.currentTimeMillis() - startTime));
}

/**
* Send request to ES to delete all documents of the given type
*/
public void sendDeleteAllDocsRequest(String index) {
long startTime = System.currentTimeMillis();
String request = getDeleteAllDocumentsRequestContent();
ElasticResponse response = sendHttpPostRequest(
urlClient + "/" + index
+ "/_delete_by_query?conflicts=proceed&wait_for_completion=true",
request);
auditTrail.audit(ElasticIndexingAuditTrail.DELETE_ALL,
null,
index,
response.getStatusCode(),
response.getMessage(),
(System.currentTimeMillis() - startTime));
}
/**
* Send request to ES to create a new Ingest pipeline for attachment
*
Expand Down Expand Up @@ -205,13 +222,13 @@ public void sendCreateIndexAliasRequest(String index, String oldIndex, String in
*/
@SuppressWarnings("unchecked")
public Set<String> sendGetIndexAliasesRequest(String index) {
String indexAliasURL = urlClient + "/" + index + "/_aliases/";
String indexAliasURL = urlClient + "/" + index + "/_alias/";
ElasticResponse responseExists = sendHttpGetRequest(indexAliasURL);
// Test if he alias already exists
if (responseExists.getStatusCode() == HttpStatus.SC_OK) {
// Get all aliases information
String aliasesURL = urlClient + "/_aliases";
ElasticResponse responseAliases = sendHttpGetRequest(indexAliasURL);
ElasticResponse responseAliases = sendHttpGetRequest(aliasesURL);
// An ES communication can happen, so throw an exception
if (responseAliases.getStatusCode() != HttpStatus.SC_OK) {
throw new ElasticClientException("Can't get aliases from URL " + aliasesURL);
Expand Down Expand Up @@ -318,15 +335,14 @@ public void sendDeleteIndexRequest(String index) {
* This operation reindex the documents from old index/type to new index/type
* mapping. A pipeline could be used when reindexing in case Ingest Attachment
* plugin is used by a target type.
*
* @param index target index name
*
* @param index target index name
* @param oldIndex source index name
* @param type source type name
* @param pipeline target pipeline name (optional)
* @return
*/
public void sendReindexTypeRequest(String index, String oldIndex, String type, String pipeline) {
public void sendReindexTypeRequest(String index, String oldIndex, String pipeline) {
long startTime = System.currentTimeMillis();
String request = getReindexRequestContent(index, oldIndex, type, pipeline);
String request = getReindexRequestContent(index, oldIndex, pipeline);
ElasticResponse response = sendHttpPostRequest(urlClient + "/_reindex", request);
auditTrail.audit(ElasticIndexingAuditTrail.REINDEX_TYPE,
null,
Expand All @@ -335,7 +351,7 @@ public void sendReindexTypeRequest(String index, String oldIndex, String type, S
response.getMessage(),
(System.currentTimeMillis() - startTime));
if (response.getStatusCode() != HttpStatus.SC_OK) {
throw new ElasticClientException("Can't reindex index " + index + ", type = " + type + ", reqponse code = "
throw new ElasticClientException("Can't reindex index " + index + ", response code = "
+ response.getStatusCode()
+ ", message = " + response.getMessage());
}
Expand Down Expand Up @@ -419,26 +435,36 @@ private void logBulkResponseItem(JSONObject item, long executionTime) {
}
}
}

private String getDeleteAllDocumentsRequestContent() {
JSONObject deleteAllRequest = new JSONObject();
JSONObject deleteQueryRequest = new JSONObject();
deleteQueryRequest.put("match_all", new JSONObject());
deleteAllRequest.put("query", deleteQueryRequest);

String request = deleteAllRequest.toJSONString();
LOG.debug("Delete All request to ES: \n {}", request);
return request;
}

@SuppressWarnings({ "unchecked" })
private String getReindexRequestContent(String index, String oldIndex, String type, String pipeline) {
private String getReindexRequestContent(String index, String oldIndex, String pipeline) {
JSONObject reindexRequest = new JSONObject();

JSONObject reindexSourceRequest = new JSONObject();
reindexRequest.put("source", reindexSourceRequest);
reindexSourceRequest.put("index", oldIndex);
reindexSourceRequest.put("type", type);
reindexRequest.put("source", reindexSourceRequest);

JSONObject reindexDestRequest = new JSONObject();
reindexRequest.put("dest", reindexDestRequest);
reindexDestRequest.put("index", index);
if (pipeline != null) {
reindexDestRequest.put("pipeline", pipeline);
}
reindexRequest.put("dest", reindexDestRequest);

String request = reindexRequest.toJSONString();

LOG.debug("Reindex Request from old index {} type {} to new index : \n {}", oldIndex, type, index, request);
LOG.debug("Reindex Request from old index {} type {} to new index : \n {}", oldIndex, index, request);
return request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,6 @@ public ElasticSearchingClient(ElasticIndexingAuditTrail auditTrail) {
initHttpClient();
}

/**
* No need to ES Type anymore, this method will be removed
* shortly
*
* @param esQuery
* @param index
* @param type
* @return
*/
@Deprecated
public String sendRequest(String esQuery, String index, String type) {
if (LOG.isDebugEnabled()) {
// Display stack trace
LOG.warn(new IllegalStateException("This method has been deprecated and will be removed in future releases."));
} else {
LOG.warn("This method has been deprecated and will be removed in future releases. To see stack trace, you can enable debug level on this class.");
}
return sendRequest(esQuery, index);
}

public String sendRequest(String esQuery, String index) {
long startTime = System.currentTimeMillis();
StringBuilder url = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,4 @@ public interface IndexingService {
* @LevelAPI Experimental
*/
void unindex(String connectorName, String id);


}
}
Loading

0 comments on commit 198e0f4

Please sign in to comment.