Skip to content

Commit

Permalink
Clean up BrokerRequestHandler and BrokerResponse (apache#13179)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored May 19, 2024
1 parent d4bf8f2 commit e71d1c6
Show file tree
Hide file tree
Showing 45 changed files with 3,405 additions and 4,093 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import javax.inject.Inject;
Expand All @@ -60,6 +61,7 @@
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
Expand Down Expand Up @@ -295,6 +297,7 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI
private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterIdentity httpRequesterIdentity,
boolean onlyDql, HttpHeaders httpHeaders, boolean forceUseMultiStage)
throws Exception {
long requestArrivalTimeMs = System.currentTimeMillis();
SqlNodeAndOptions sqlNodeAndOptions;
try {
sqlNodeAndOptions = RequestUtils.parseQuery(sqlRequestJson.get(Request.SQL).asText(), sqlRequestJson);
Expand All @@ -311,9 +314,10 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI
}
switch (sqlType) {
case DQL:
try (RequestScope requestStatistics = Tracing.getTracer().createRequestScope()) {
return _requestHandler.handleRequest(sqlRequestJson, sqlNodeAndOptions, httpRequesterIdentity,
requestStatistics, httpHeaders);
try (RequestScope requestContext = Tracing.getTracer().createRequestScope()) {
requestContext.setRequestArrivalTimeMillis(requestArrivalTimeMs);
return _requestHandler.handleRequest(sqlRequestJson, sqlNodeAndOptions, httpRequesterIdentity, requestContext,
httpHeaders);
} catch (Exception e) {
LOGGER.error("Error handling DQL request:\n{}\nException: {}", sqlRequestJson,
QueryException.getTruncatedStackTrace(e));
Expand Down Expand Up @@ -361,10 +365,10 @@ private static HttpRequesterIdentity makeHttpIdentity(org.glassfish.grizzly.http
static Response getPinotQueryResponse(BrokerResponse brokerResponse)
throws Exception {
int queryErrorCodeHeaderValue = -1; // default value of the header.

if (brokerResponse.getExceptionsSize() != 0) {
List<QueryProcessingException> exceptions = brokerResponse.getExceptions();
if (!exceptions.isEmpty()) {
// set the header value as first exception error code value.
queryErrorCodeHeaderValue = brokerResponse.getProcessingExceptions().get(0).getErrorCode();
queryErrorCodeHeaderValue = exceptions.get(0).getErrorCode();
}

// returning the Response with OK status and header value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.broker.BrokerAdminApiApplication;
import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
import org.apache.pinot.broker.requesthandler.BaseSingleStageBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandlerDelegate;
import org.apache.pinot.broker.requesthandler.GrpcBrokerRequestHandler;
Expand Down Expand Up @@ -72,8 +73,7 @@
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
import org.apache.pinot.spi.eventlistener.query.PinotBrokerQueryEventListenerFactory;
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.services.ServiceRole;
Expand Down Expand Up @@ -128,7 +128,6 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
protected HelixManager _participantHelixManager;
// Handles the server routing stats.
protected ServerRoutingStatsManager _serverRoutingStatsManager;
protected BrokerQueryEventListener _brokerQueryEventListener;

@Override
public void init(PinotConfiguration brokerConf)
Expand All @@ -139,8 +138,8 @@ public void init(PinotConfiguration brokerConf)
_clusterName = brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME);
ServiceStartableUtils.applyClusterConfig(_brokerConf, _zkServers, _clusterName, ServiceRole.BROKER);

PinotInsecureMode.setPinotInInsecureMode(
Boolean.valueOf(_brokerConf.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE,
PinotInsecureMode.setPinotInInsecureMode(Boolean.valueOf(
_brokerConf.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE,
CommonConstants.DEFAULT_PINOT_INSECURE_MODE)));

if (_brokerConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
Expand Down Expand Up @@ -277,8 +276,7 @@ public void start()
final PinotConfiguration factoryConf = _brokerConf.subset(Broker.ACCESS_CONTROL_CONFIG_PREFIX);
// Adding cluster name to the config so that it can be used by the AccessControlFactory
factoryConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, _brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME));
_accessControlFactory =
AccessControlFactory.loadFactory(factoryConf, _propertyStore);
_accessControlFactory = AccessControlFactory.loadFactory(factoryConf, _propertyStore);
HelixExternalViewBasedQueryQuotaManager queryQuotaManager =
new HelixExternalViewBasedQueryQuotaManager(_brokerMetrics, _instanceId);
queryQuotaManager.init(_spectatorHelixManager);
Expand All @@ -292,49 +290,42 @@ public void start()
boolean caseInsensitive =
_brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, Helix.DEFAULT_ENABLE_CASE_INSENSITIVE);
TableCache tableCache = new TableCache(_propertyStore, caseInsensitive);
// Configure TLS for netty connection to server
TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_brokerConf, Broker.BROKER_TLS_PREFIX);
NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, Broker.BROKER_NETTY_PREFIX);

LOGGER.info("Initializing Broker Event Listener Factory");
_brokerQueryEventListener = PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener(
_brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX));
BrokerQueryEventListenerFactory.init(_brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX));

// Create Broker request handler.
String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId());
String brokerRequestHandlerType =
_brokerConf.getProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, Broker.DEFAULT_BROKER_REQUEST_HANDLER_TYPE);
BrokerRequestHandler singleStageBrokerRequestHandler = null;
BaseSingleStageBrokerRequestHandler singleStageBrokerRequestHandler;
if (brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) {
singleStageBrokerRequestHandler =
new GrpcBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, queryQuotaManager,
tableCache, _brokerMetrics, null, _brokerQueryEventListener);
} else { // default request handler type, e.g. netty
tableCache);
} else {
// Default request handler type, i.e. netty
NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, Broker.BROKER_NETTY_PREFIX);
// Configure TLS for netty connection to server
TlsConfig tlsDefaults = null;
if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) {
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, tlsDefaults, _serverRoutingStatsManager,
_brokerQueryEventListener);
} else {
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, null, _serverRoutingStatsManager,
_brokerQueryEventListener);
tlsDefaults = TlsUtils.extractTlsConfig(_brokerConf, Broker.BROKER_TLS_PREFIX);
}
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, nettyDefaults, tlsDefaults, _serverRoutingStatsManager);
}

BrokerRequestHandler multiStageBrokerRequestHandler = null;
MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
if (_brokerConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) {
// multi-stage request handler uses both Netty and GRPC ports.
// worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport.
// TODO: decouple protocol and engine selection.
multiStageBrokerRequestHandler =
new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics, _brokerQueryEventListener);
queryQuotaManager, tableCache);
}

_brokerRequestHandler = new BrokerRequestHandlerDelegate(brokerId, singleStageBrokerRequestHandler,
multiStageBrokerRequestHandler, _brokerMetrics);
_brokerRequestHandler =
new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, multiStageBrokerRequestHandler);
_brokerRequestHandler.start();

// Enable/disable thread CPU time measurement through instance config.
Expand All @@ -345,8 +336,8 @@ public void start()
ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(
_brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
CommonConstants.Broker.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT));
Tracing.ThreadAccountantOps
.initializeThreadAccountant(_brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId);
Tracing.ThreadAccountantOps.initializeThreadAccountant(
_brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId);

String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
if (controllerUrl != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.broker.requesthandler.BaseBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.BaseSingleStageBrokerRequestHandler.ServerStats;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -112,35 +111,27 @@ public double getLogRateLimit() {
}

private boolean shouldForceLog(QueryLogParams params) {
return params._response.isNumGroupsLimitReached() || params._response.getExceptionsSize() > 0
|| params._timeUsedMs > TimeUnit.SECONDS.toMillis(1);
return params._response.isPartialResult() || params._response.getTimeUsedMs() > TimeUnit.SECONDS.toMillis(1);
}

public static class QueryLogParams {
final long _requestId;
final String _query;
final RequestContext _requestContext;
final String _table;
final int _numUnavailableSegments;
@Nullable
final BaseBrokerRequestHandler.ServerStats _serverStats;
final ServerStats _serverStats;
final BrokerResponse _response;
final long _timeUsedMs;
@Nullable
final RequesterIdentity _requester;

public QueryLogParams(long requestId, String query, RequestContext requestContext, String table,
int numUnavailableSegments, @Nullable BaseBrokerRequestHandler.ServerStats serverStats, BrokerResponse response,
long timeUsedMs, @Nullable RequesterIdentity requester) {
_requestId = requestId;
public QueryLogParams(String query, String table, int numUnavailableSegments, @Nullable ServerStats serverStats,
BrokerResponse response, @Nullable RequesterIdentity requester) {
_query = query;
_table = table;
_timeUsedMs = timeUsedMs;
_requestContext = requestContext;
_requester = requester;
_response = response;
_serverStats = serverStats;
_numUnavailableSegments = numUnavailableSegments;
_serverStats = serverStats;
_response = response;
_requester = requester;
}
}

Expand All @@ -152,7 +143,7 @@ private enum QueryLogEntry {
REQUEST_ID("requestId") {
@Override
void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
builder.append(params._requestId);
builder.append(params._response.getRequestId());
}
},
TABLE("table") {
Expand All @@ -164,7 +155,7 @@ void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params)
TIME_MS("timeMs") {
@Override
void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
builder.append(params._timeUsedMs);
builder.append(params._response.getTimeUsedMs());
}
},
DOCS("docs") {
Expand Down Expand Up @@ -215,7 +206,7 @@ void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params)
BROKER_REDUCE_TIME_MS("brokerReduceTimeMs") {
@Override
void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
builder.append(params._requestContext.getReduceTimeMillis());
builder.append(params._response.getBrokerReduceTimeMs());
}
},
EXCEPTIONS("exceptions") {
Expand Down
Loading

0 comments on commit e71d1c6

Please sign in to comment.