Skip to content

Commit

Permalink
Avoid port conflict in integration tests (apache#13390)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Jun 14, 2024
1 parent 228fa75 commit 4a428ec
Show file tree
Hide file tree
Showing 34 changed files with 584 additions and 751 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public String getInstanceId() {
}

@Override
public PinotConfiguration getConfig() {
public ControllerConf getConfig() {
return _config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ public class ControllerStarterStatelessTest extends ControllerTest {
private final Map<String, Object> _configOverride = new HashMap<>();

@Override
public Map<String, Object> getDefaultControllerConfiguration() {
Map<String, Object> defaultConfig = super.getDefaultControllerConfiguration();
defaultConfig.putAll(_configOverride);
return defaultConfig;
protected void overrideControllerConf(Map<String, Object> properties) {
properties.putAll(_configOverride);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,16 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control

private static final String TIER_NAME = "tier1";

@Override
protected void overrideControllerConf(Map<String, Object> properties) {
properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false);
}

@BeforeClass
public void setUp()
throws Exception {
startZk();
Map<String, Object> properties = getDefaultControllerConfiguration();
properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false);
startController(properties);
startController();

addFakeBrokerInstancesToAutoJoinHelixCluster(1, false);
addFakeServerInstancesToAutoJoinHelixCluster(2, false);
Expand Down Expand Up @@ -372,7 +375,7 @@ private Map<String, InstancePartitions> getInstancePartitionsMap()

private Map<String, InstancePartitions> deserializeInstancePartitionsMap(String instancePartitionsMapString)
throws Exception {
return JsonUtils.stringToObject(instancePartitionsMapString, new TypeReference<Map<String, InstancePartitions>>() {
return JsonUtils.stringToObject(instancePartitionsMapString, new TypeReference<>() {
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@
@Test(groups = "stateless")
public class PinotDummyExtraRestletResourceStatelessTest extends ControllerTest {

@Override
protected void overrideControllerConf(Map<String, Object> properties) {
properties.put(CONTROLLER_RESOURCE_PACKAGES,
DEFAULT_CONTROLLER_RESOURCE_PACKAGES + ",org.apache.pinot.controller.api.extraresources");
}

@BeforeClass
public void setUp()
throws Exception {
startZk();
Map<String, Object> properties = getDefaultControllerConfiguration();
properties.put(CONTROLLER_RESOURCE_PACKAGES,
DEFAULT_CONTROLLER_RESOURCE_PACKAGES + ",org.apache.pinot.controller.api.extraresources");
startController(properties);
startController();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void teardown() {
}

@Override
public ControllerStarter getControllerStarter() {
public ControllerStarter createControllerStarter() {
return new MockControllerStarter();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pinot.controller.helix;

import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
Expand All @@ -34,8 +33,6 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
Expand Down Expand Up @@ -77,7 +74,6 @@
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.apache.pinot.spi.utils.CommonConstants.Server;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
Expand All @@ -87,16 +83,11 @@

import static org.apache.pinot.spi.utils.CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE;
import static org.apache.pinot.spi.utils.CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE;
import static org.apache.pinot.spi.utils.CommonConstants.Server.DEFAULT_ADMIN_API_PORT;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import static org.testng.Assert.*;


public class ControllerTest {
public static final String LOCAL_HOST = "localhost";
public static final int DEFAULT_CONTROLLER_PORT = 18998;
public static final String DEFAULT_DATA_DIR = new File(FileUtils.getTempDirectoryPath(),
"test-controller-data-dir" + System.currentTimeMillis()).getAbsolutePath();
public static final String DEFAULT_LOCAL_TEMP_DIR = new File(FileUtils.getTempDirectoryPath(),
Expand All @@ -122,8 +113,13 @@ public class ControllerTest {

protected static HttpClient _httpClient = null;

private int _controllerPort;
private String _controllerBaseApiUrl;
protected int _nextControllerPort = 16000;
protected int _nextBrokerPort = 17000;
protected int _nextServerPort = 18000;
protected int _nextMinionPort = 19000;

protected int _controllerPort;
protected String _controllerBaseApiUrl;
protected ControllerConf _controllerConfig;
protected ControllerRequestURLBuilder _controllerRequestURLBuilder;

Expand Down Expand Up @@ -211,23 +207,35 @@ public String getZkUrl() {

public Map<String, Object> getDefaultControllerConfiguration() {
Map<String, Object> properties = new HashMap<>();

properties.put(ControllerConf.ZK_STR, getZkUrl());
properties.put(ControllerConf.HELIX_CLUSTER_NAME, getHelixClusterName());
properties.put(ControllerConf.CONTROLLER_HOST, LOCAL_HOST);
properties.put(ControllerConf.CONTROLLER_PORT,
NetUtils.findOpenPort(DEFAULT_CONTROLLER_PORT + RandomUtils.nextInt(10000)));
int controllerPort = NetUtils.findOpenPort(_nextControllerPort);
properties.put(ControllerConf.CONTROLLER_PORT, controllerPort);
if (_controllerPort == 0) {
_controllerPort = controllerPort;
}
_nextControllerPort = controllerPort + 1;
properties.put(ControllerConf.DATA_DIR, DEFAULT_DATA_DIR);
properties.put(ControllerConf.LOCAL_TEMP_DIR, DEFAULT_LOCAL_TEMP_DIR);
properties.put(ControllerConf.ZK_STR, getZkUrl());
properties.put(ControllerConf.HELIX_CLUSTER_NAME, getHelixClusterName());
// Enable groovy on the controller
properties.put(ControllerConf.DISABLE_GROOVY, false);
properties.put(CommonConstants.CONFIG_OF_TIMEZONE, "UTC");
overrideControllerConf(properties);
return properties;
}

/**
* Can be overridden to add more properties.
*/
protected void overrideControllerConf(Map<String, Object> properties) {
// do nothing, to be overridden by tests if they need something specific
}

/**
* Can be overridden to use a different implementation.
*/
public BaseControllerStarter createControllerStarter() {
return new ControllerStarter();
}

public void startController()
Expand All @@ -237,29 +245,16 @@ public void startController()

public void startController(Map<String, Object> properties)
throws Exception {
Preconditions.checkState(_controllerStarter == null);

_controllerConfig = new ControllerConf(properties);
assertNull(_controllerStarter, "Controller is already started");
assertTrue(_controllerPort > 0, "Controller port is not assigned");

String controllerScheme = "http";
if (StringUtils.isNotBlank(_controllerConfig.getControllerVipProtocol())) {
controllerScheme = _controllerConfig.getControllerVipProtocol();
}

_controllerPort = DEFAULT_CONTROLLER_PORT;
if (StringUtils.isNotBlank(_controllerConfig.getControllerPort())) {
_controllerPort = Integer.parseInt(_controllerConfig.getControllerPort());
} else if (StringUtils.isNotBlank(_controllerConfig.getControllerVipPort())) {
_controllerPort = Integer.parseInt(_controllerConfig.getControllerVipPort());
}

_controllerBaseApiUrl = controllerScheme + "://localhost:" + _controllerPort;
_controllerRequestURLBuilder = ControllerRequestURLBuilder.baseUrl(_controllerBaseApiUrl);
_controllerDataDir = _controllerConfig.getDataDir();

_controllerStarter = getControllerStarter();
_controllerStarter = createControllerStarter();
_controllerStarter.init(new PinotConfiguration(properties));
_controllerStarter.start();
_controllerConfig = _controllerStarter.getConfig();
_controllerBaseApiUrl = _controllerConfig.generateVipUrl();
_controllerRequestURLBuilder = ControllerRequestURLBuilder.baseUrl(_controllerBaseApiUrl);
_controllerDataDir = _controllerConfig.getDataDir();
_helixResourceManager = _controllerStarter.getHelixResourceManager();
_helixManager = _controllerStarter.getHelixControllerManager();
_helixDataAccessor = _helixManager.getHelixDataAccessor();
Expand Down Expand Up @@ -293,7 +288,7 @@ public void startController(Map<String, Object> properties)
}

public void stopController() {
Preconditions.checkState(_controllerStarter != null);
assertNotNull(_controllerStarter, "Controller hasn't been started");
_controllerStarter.stop();
_controllerStarter = null;
_controllerRequestClient = null;
Expand Down Expand Up @@ -394,30 +389,13 @@ public void onBecomeOfflineFromError(Message message, NotificationContext contex

public void addFakeServerInstancesToAutoJoinHelixCluster(int numInstances, boolean isSingleTenant)
throws Exception {
addFakeServerInstancesToAutoJoinHelixCluster(numInstances, isSingleTenant, Server.DEFAULT_ADMIN_API_PORT);
}

public void addFakeServerInstancesToAutoJoinHelixCluster(int numInstances, boolean isSingleTenant, int baseAdminPort)
throws Exception {
for (int i = 0; i < numInstances; i++) {
addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + i, isSingleTenant,
NetUtils.findOpenPort(baseAdminPort + i + RandomUtils.nextInt(10000)));
addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + i, isSingleTenant);
}
}

public int getFakeServerInstanceCount() {
return _helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_OFFLINE").size()
+ _helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), UNTAGGED_SERVER_INSTANCE).size();
}

public void addFakeServerInstanceToAutoJoinHelixCluster(String instanceId, boolean isSingleTenant)
throws Exception {
addFakeServerInstanceToAutoJoinHelixCluster(instanceId, isSingleTenant,
NetUtils.findOpenPort(Server.DEFAULT_ADMIN_API_PORT + RandomUtils.nextInt(10000)));
}

public void addFakeServerInstanceToAutoJoinHelixCluster(String instanceId, boolean isSingleTenant, int adminPort)
throws Exception {
HelixManager helixManager =
HelixManagerFactory.getZKHelixManager(getHelixClusterName(), instanceId, InstanceType.PARTICIPANT, getZkUrl());
helixManager.getStateMachineEngine()
Expand All @@ -433,33 +411,29 @@ public void addFakeServerInstanceToAutoJoinHelixCluster(String instanceId, boole
}
HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
getHelixClusterName()).forParticipant(instanceId).build();
helixAdmin.setConfig(configScope,
Collections.singletonMap(Helix.Instance.ADMIN_PORT_KEY, Integer.toString(adminPort)));
int adminPort = NetUtils.findOpenPort(_nextServerPort);
helixAdmin.setConfig(configScope, Map.of(Helix.Instance.ADMIN_PORT_KEY, Integer.toString(adminPort)));
_nextServerPort = adminPort + 1;
_fakeInstanceHelixManagers.add(helixManager);
}

/** Add fake server instances until total number of server instances reaches maxCount */
public void addMoreFakeServerInstancesToAutoJoinHelixCluster(int maxCount, boolean isSingleTenant)
throws Exception {
addMoreFakeServerInstancesToAutoJoinHelixCluster(maxCount, isSingleTenant,
NetUtils.findOpenPort(DEFAULT_ADMIN_API_PORT + RandomUtils.nextInt(10000)));
}

/** Add fake server instances until total number of server instances reaches maxCount */
public void addMoreFakeServerInstancesToAutoJoinHelixCluster(int maxCount, boolean isSingleTenant, int baseAdminPort)
throws Exception {

// get current instance count
int currentCount = getFakeServerInstanceCount();

// Add more instances if current count is less than max instance count.
if (currentCount < maxCount) {
for (int i = currentCount; i < maxCount; i++) {
addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + i, isSingleTenant, baseAdminPort + i);
}
for (int i = currentCount; i < maxCount; i++) {
addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + i, isSingleTenant);
}
}

public int getFakeServerInstanceCount() {
return _helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_OFFLINE").size()
+ _helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), UNTAGGED_SERVER_INSTANCE).size();
}

public static class FakeSegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
private static final String STATE_MODEL_DEF = "SegmentOnlineOfflineStateModel";

Expand Down Expand Up @@ -933,6 +907,10 @@ public HelixAdmin getHelixAdmin() {
return _helixAdmin;
}

public BaseControllerStarter getControllerStarter() {
return _controllerStarter;
}

public PinotHelixResourceManager getHelixResourceManager() {
return _helixResourceManager;
}
Expand All @@ -953,10 +931,6 @@ public int getControllerPort() {
return _controllerPort;
}

public BaseControllerStarter getControllerStarter() {
return _controllerStarter == null ? new ControllerStarter() : _controllerStarter;
}

public ControllerConf getControllerConfig() {
return _controllerConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,17 @@ public class PinotHelixResourceManagerAssignmentTest extends ControllerTest {
private static final String RAW_TABLE_NAME = "testTable";
private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);

@Override
protected void overrideControllerConf(Map<String, Object> properties) {
properties.put(ControllerConf.CONTROLLER_ENABLE_TIERED_SEGMENT_ASSIGNMENT, true);
properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false);
}

@BeforeClass
public void setUp()
throws Exception {
startZk();

Map<String, Object> properties = getDefaultControllerConfiguration();
properties.put(ControllerConf.CONTROLLER_ENABLE_TIERED_SEGMENT_ASSIGNMENT, true);
properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false);
startController(properties);
startController();

addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKER_INSTANCES, false);
addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVER_INSTANCES, false);
Expand Down Expand Up @@ -116,9 +118,8 @@ private void resetServerTags() {
_helixResourceManager.createServerTenant(serverTenant);

// Create cold tenant
Tenant coldTenant =
new Tenant(TenantRole.SERVER, SERVER_COLD_TENANT_NAME, NUM_OFFLINE_COLD_SERVER_INSTANCES,
NUM_OFFLINE_COLD_SERVER_INSTANCES, 0);
Tenant coldTenant = new Tenant(TenantRole.SERVER, SERVER_COLD_TENANT_NAME, NUM_OFFLINE_COLD_SERVER_INSTANCES,
NUM_OFFLINE_COLD_SERVER_INSTANCES, 0);
_helixResourceManager.createServerTenant(coldTenant);

assertEquals(_helixResourceManager.getOnlineUnTaggedServerInstanceList().size(), 0);
Expand Down
Loading

0 comments on commit 4a428ec

Please sign in to comment.