Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Improve Health Status of security-auditlog Index in Single Node Clusters #5085

Merged
merged 1 commit into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/
package org.opensearch.security;

import java.io.IOException;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.opensearch.test.framework.AuditCompliance;
import org.opensearch.test.framework.AuditConfiguration;
import org.opensearch.test.framework.AuditFilters;
import org.opensearch.test.framework.cluster.ClusterManager;
import org.opensearch.test.framework.cluster.LocalCluster;
import org.opensearch.test.framework.cluster.TestRestClient;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.opensearch.test.framework.TestSecurityConfig.AuthcDomain.AUTHC_HTTPBASIC_INTERNAL;
import static org.opensearch.test.framework.TestSecurityConfig.User.USER_ADMIN;

@RunWith(com.carrotsearch.randomizedtesting.RandomizedRunner.class)
@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
public class InternalAuditLogTest {

private static final Logger log = LogManager.getLogger(InternalAuditLogTest.class);

@ClassRule
public static final LocalCluster cluster = new LocalCluster.Builder().clusterManager(ClusterManager.SINGLENODE)
.anonymousAuth(false)
.authc(AUTHC_HTTPBASIC_INTERNAL)
.users(USER_ADMIN)
.internalAudit(
new AuditConfiguration(true).compliance(new AuditCompliance().enabled(true))
.filters(new AuditFilters().enabledRest(true).enabledTransport(true))
)
.build();

@Test
public void testAuditLogShouldBeGreenInSingleNodeCluster() throws IOException {
try (TestRestClient client = cluster.getRestClient(USER_ADMIN)) {
client.get(""); // demo request for insuring audit-log index is created beforehand
TestRestClient.HttpResponse indicesResponse = client.get("_cat/indices");

assertThat(indicesResponse.getBody(), containsString("security-auditlog"));
assertThat(indicesResponse.getBody(), containsString("green"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,18 @@ public Builder audit(AuditConfiguration auditConfiguration) {
return this;
}

public Builder internalAudit(AuditConfiguration auditConfiguration) {
if (auditConfiguration != null) {
testSecurityConfig.audit(auditConfiguration);
}
if (auditConfiguration.isEnabled()) {
nodeOverrideSettingsBuilder.put("plugins.security.audit.type", "internal_opensearch");
} else {
nodeOverrideSettingsBuilder.put("plugins.security.audit.type", "noop");
}
return this;
}

public List<TestSecurityConfig.User> getUsers() {
return testSecurityConfig.getUsers();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public AuditLogImpl(
) {
super(settings, threadPool, resolver, clusterService, environment);
this.settings = settings;
this.messageRouter = new AuditMessageRouter(settings, clientProvider, threadPool, configPath);
this.messageRouter = new AuditMessageRouter(settings, clientProvider, threadPool, configPath, clusterService);
this.messageRouterEnabled = this.messageRouter.isEnabled();

log.info("Message routing enabled: {}", this.messageRouterEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.Logger;

import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.security.auditlog.config.ThreadPoolConfig;
import org.opensearch.security.auditlog.impl.AuditCategory;
Expand All @@ -43,9 +44,15 @@ public class AuditMessageRouter {
final SinkProvider sinkProvider;
final AsyncStoragePool storagePool;

public AuditMessageRouter(final Settings settings, final Client clientProvider, ThreadPool threadPool, final Path configPath) {
public AuditMessageRouter(
final Settings settings,
final Client clientProvider,
ThreadPool threadPool,
final Path configPath,
final ClusterService clusterService
) {
this(
new SinkProvider(settings, clientProvider, threadPool, configPath),
new SinkProvider(settings, clientProvider, threadPool, configPath, clusterService),
new AsyncStoragePool(ThreadPoolConfig.getConfig(settings))
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
package org.opensearch.security.auditlog.sink;

import java.io.IOException;
import java.util.Map;

import com.google.common.collect.ImmutableMap;

import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.support.WriteRequest.RefreshPolicy;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext.StoredContext;
Expand All @@ -29,7 +33,9 @@ public abstract class AbstractInternalOpenSearchSink extends AuditLogSink {

protected final Client clientProvider;
private final ThreadPool threadPool;
protected final ClusterService clusterService;
private final DocWriteRequest.OpType storeOpType;
final static Map<String, Object> indexSettings = ImmutableMap.of("index.number_of_shards", 1, "index.auto_expand_replicas", "0-1");

public AbstractInternalOpenSearchSink(
final String name,
Expand All @@ -38,19 +44,23 @@ public AbstractInternalOpenSearchSink(
final Client clientProvider,
ThreadPool threadPool,
AuditLogSink fallbackSink,
DocWriteRequest.OpType storeOpType
DocWriteRequest.OpType storeOpType,
ClusterService clusterService
) {
super(name, settings, settingsPrefix, fallbackSink);
this.clientProvider = clientProvider;
this.threadPool = threadPool;
this.storeOpType = storeOpType;
this.clusterService = clusterService;
}

@Override
public void close() throws IOException {

}

protected abstract boolean createIndexIfAbsent(String indexName);

public boolean doStore(final AuditMessage msg, String indexName) {

if (Boolean.parseBoolean(
Expand All @@ -64,6 +74,12 @@ public boolean doStore(final AuditMessage msg, String indexName) {

try (StoredContext ctx = threadPool.getThreadContext().stashContext()) {
try {
boolean ok = createIndexIfAbsent(indexName);
if (!ok) {
log.error("Failed to create index {}", indexName);
return false;
}

final IndexRequestBuilder irb = clientProvider.prepareIndex(indexName)
.setRefreshPolicy(RefreshPolicy.IMMEDIATE)
.setSource(msg.getAsMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.cluster.metadata.ComposableIndexTemplate;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.cluster.metadata.Template;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.security.auditlog.impl.AuditMessage;
import org.opensearch.security.support.ConfigConstants;
Expand All @@ -43,9 +44,10 @@ public InternalOpenSearchDataStreamSink(
final Path configPath,
final Client clientProvider,
ThreadPool threadPool,
AuditLogSink fallbackSink
AuditLogSink fallbackSink,
ClusterService clusterService
) {
super(name, settings, settingsPrefix, clientProvider, threadPool, fallbackSink, DocWriteRequest.OpType.CREATE);
super(name, settings, settingsPrefix, clientProvider, threadPool, fallbackSink, DocWriteRequest.OpType.CREATE, clusterService);
Settings sinkSettings = getSinkSettings(settingsPrefix);

this.dataStreamName = sinkSettings.get(ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_NAME, "opensearch-security-auditlog");
Expand Down Expand Up @@ -132,6 +134,12 @@ private boolean initDataStream() {
return this.dataStreamInitialized;
}

@Override
public boolean createIndexIfAbsent(String indexName) {
// datastream is initialized in initDataStream
return true;
}

@Override
public void close() throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
import java.io.IOException;
import java.nio.file.Path;

import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.security.auditlog.impl.AuditMessage;
import org.opensearch.security.support.ConfigConstants;
Expand All @@ -36,9 +39,10 @@ public InternalOpenSearchSink(
final Path configPath,
final Client clientProvider,
ThreadPool threadPool,
AuditLogSink fallbackSink
AuditLogSink fallbackSink,
ClusterService clusterService
) {
super(name, settings, settingsPrefix, clientProvider, threadPool, fallbackSink, null);
super(name, settings, settingsPrefix, clientProvider, threadPool, fallbackSink, null, clusterService);

Settings sinkSettings = getSinkSettings(settingsPrefix);
this.index = sinkSettings.get(ConfigConstants.SECURITY_AUDIT_OPENSEARCH_INDEX, "'security-auditlog-'YYYY.MM.dd");
Expand All @@ -54,6 +58,23 @@ public InternalOpenSearchSink(
}
}

@Override
public boolean createIndexIfAbsent(String indexName) {
if (clusterService.state().metadata().hasIndex(indexName)) {
return true;
}

try {
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(indexSettings);
final boolean ok = clientProvider.admin().indices().create(createIndexRequest).actionGet().isAcknowledged();
log.info("Index {} created?: {}", indexName, ok);
return ok;
} catch (ResourceAlreadyExistsException resourceAlreadyExistsException) {
log.info("Index {} already exists", indexName);
return true;
}
}

@Override
public void close() throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.logging.log4j.Logger;

import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.security.dlic.rest.support.Utils;
import org.opensearch.security.support.ConfigConstants;
Expand All @@ -34,21 +35,35 @@ public class SinkProvider {
private final ThreadPool threadPool;
private final Path configPath;
private final Settings settings;
private final ClusterService clusterService;
final Map<String, AuditLogSink> allSinks = new HashMap<>();
AuditLogSink defaultSink;
AuditLogSink fallbackSink;

public SinkProvider(final Settings settings, final Client clientProvider, ThreadPool threadPool, final Path configPath) {
public SinkProvider(
final Settings settings,
final Client clientProvider,
ThreadPool threadPool,
final Path configPath,
final ClusterService clusterService
) {
this.settings = settings;
this.clientProvider = clientProvider;
this.threadPool = threadPool;
this.configPath = configPath;
this.clusterService = clusterService;

// fall back sink, make sure we don't lose messages
String fallbackConfigPrefix = ConfigConstants.SECURITY_AUDIT_CONFIG_ENDPOINTS + "." + FALLBACKSINK_NAME;
Settings fallbackSinkSettings = settings.getAsSettings(fallbackConfigPrefix);
if (!fallbackSinkSettings.isEmpty()) {
this.fallbackSink = createSink(FALLBACKSINK_NAME, fallbackSinkSettings.get("type"), settings, fallbackConfigPrefix + ".config");
this.fallbackSink = createSink(
FALLBACKSINK_NAME,
fallbackSinkSettings.get("type"),
settings,
fallbackConfigPrefix + ".config",
clusterService
);
}

// make sure we always have a fallback to write to
Expand All @@ -63,7 +78,8 @@ public SinkProvider(final Settings settings, final Client clientProvider, Thread
DEFAULTSINK_NAME,
settings.get(ConfigConstants.SECURITY_AUDIT_TYPE_DEFAULT),
settings,
ConfigConstants.SECURITY_AUDIT_CONFIG_DEFAULT
ConfigConstants.SECURITY_AUDIT_CONFIG_DEFAULT,
clusterService
);
if (defaultSink == null) {
log.error("Default endpoint could not be created, auditlog will not work properly.");
Expand Down Expand Up @@ -92,7 +108,8 @@ public SinkProvider(final Settings settings, final Client clientProvider, Thread
sinkName,
type,
this.settings,
ConfigConstants.SECURITY_AUDIT_CONFIG_ENDPOINTS + "." + sinkName + ".config"
ConfigConstants.SECURITY_AUDIT_CONFIG_ENDPOINTS + "." + sinkName + ".config",
clusterService
);
if (sink == null) {
log.error("Endpoint '{}' could not be created, check log file for further information.", sinkName);
Expand Down Expand Up @@ -128,12 +145,27 @@ protected void close(AuditLogSink sink) {
}
}

private final AuditLogSink createSink(final String name, final String type, final Settings settings, final String settingsPrefix) {
private final AuditLogSink createSink(
final String name,
final String type,
final Settings settings,
final String settingsPrefix,
final ClusterService clusterService
) {
AuditLogSink sink = null;
if (type != null) {
switch (type.toLowerCase()) {
case "internal_opensearch":
sink = new InternalOpenSearchSink(name, settings, settingsPrefix, configPath, clientProvider, threadPool, fallbackSink);
sink = new InternalOpenSearchSink(
name,
settings,
settingsPrefix,
configPath,
clientProvider,
threadPool,
fallbackSink,
clusterService
);
break;
case "internal_opensearch_data_stream":
sink = new InternalOpenSearchDataStreamSink(
Expand All @@ -143,7 +175,8 @@ private final AuditLogSink createSink(final String name, final String type, fina
configPath,
clientProvider,
threadPool,
fallbackSink
fallbackSink,
clusterService
);
break;
case "external_opensearch":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ protected void validateJson(final String json) throws Exception { // this functi
}

protected AuditMessageRouter createMessageRouterComplianceEnabled(Settings settings) {
AuditMessageRouter router = new AuditMessageRouter(settings, null, null, null);
AuditMessageRouter router = new AuditMessageRouter(settings, null, null, null, null);
router.enableRoutes(settings);
return router;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void testNoDefaultSink() throws Exception {
)
)
.build();
AuditMessageRouter router = new AuditMessageRouter(settings, null, null, null);
AuditMessageRouter router = new AuditMessageRouter(settings, null, null, null, null);
// no default sink, audit log not enabled
assertThat(router.isEnabled(), is(false));
assertThat(router.defaultSink, is(nullValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testKafka() throws Exception {
consumer.subscribe(Arrays.asList("compliance"));

Settings settings = settingsBuilder.put("path.home", ".").build();
SinkProvider provider = new SinkProvider(settings, null, null, null);
SinkProvider provider = new SinkProvider(settings, null, null, null, null);
AuditLogSink sink = provider.getDefaultSink();
try {
assertThat(sink.getClass(), is(KafkaSink.class));
Expand Down
Loading
Loading