Skip to content

Commit

Permalink
app-layer: track modified/processed txs
Browse files Browse the repository at this point in the history
To optimize detection, and logging, to avoid going through
all the live transactions when only a few were modified.

Two boolean fields are added to the tx data: updated_tc and ts
The app-layer parsers are now responsible to set these when
needed, and the logging and detection uses them to skip
transactions that were not updated.

There may some more optimization remaining by when we set
both updated_tc and updated_ts in functions returning
a mutable transaction, by checking if all the callers
are called in one direction only (request or response)

Ticket: 7087
(cherry picked from commit b02557a)
  • Loading branch information
catenacyber authored and victorjulien committed Feb 26, 2025
1 parent 05bf4a8 commit 782f35c
Show file tree
Hide file tree
Showing 23 changed files with 131 additions and 18 deletions.
16 changes: 13 additions & 3 deletions rust/src/applayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ pub struct AppLayerTxData {
/// config: log flags
pub config: AppLayerTxConfig,

/// The tx has been updated and needs to be processed : detection, logging, cleaning
/// It can then be skipped until new data arrives.
/// There is a boolean for both directions : to server and to client
pub updated_tc: bool,
pub updated_ts: bool,

/// logger flags for tx logging api
logged: LoggerFlags,

Expand Down Expand Up @@ -152,6 +158,8 @@ impl AppLayerTxData {
files_stored: 0,
file_flags: 0,
file_tx: 0,
updated_tc: true,
updated_ts: true,
detect_flags_ts: 0,
detect_flags_tc: 0,
de_state: std::ptr::null_mut(),
Expand All @@ -162,9 +170,9 @@ impl AppLayerTxData {
/// Create new AppLayerTxData for a transaction in a single
/// direction.
pub fn for_direction(direction: Direction) -> Self {
let (detect_flags_ts, detect_flags_tc) = match direction {
Direction::ToServer => (0, APP_LAYER_TX_SKIP_INSPECT_FLAG),
Direction::ToClient => (APP_LAYER_TX_SKIP_INSPECT_FLAG, 0),
let (detect_flags_ts, detect_flags_tc, updated_ts, updated_tc) = match direction {
Direction::ToServer => (0, APP_LAYER_TX_SKIP_INSPECT_FLAG, true, false),
Direction::ToClient => (APP_LAYER_TX_SKIP_INSPECT_FLAG, 0, false, true),
};
Self {
config: AppLayerTxConfig::new(),
Expand All @@ -174,6 +182,8 @@ impl AppLayerTxData {
files_stored: 0,
file_flags: 0,
file_tx: 0,
updated_tc,
updated_ts,
detect_flags_ts,
detect_flags_tc,
de_state: std::ptr::null_mut(),
Expand Down
1 change: 1 addition & 0 deletions rust/src/applayertemplate/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ impl TemplateState {
start = rem;

if let Some(tx) = self.find_request() {
tx.tx_data.updated_tc = true;
tx.response = Some(response);
SCLogNotice!("Found response for request:");
SCLogNotice!("- Request: {:?}", tx.request);
Expand Down
4 changes: 4 additions & 0 deletions rust/src/dcerpc/dcerpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ impl DCERPCState {
for tx_old in &mut self.transactions.range_mut(self.tx_index_completed..) {
index += 1;
if !tx_old.req_done || !tx_old.resp_done {
tx_old.tx_data.updated_tc = true;
tx_old.tx_data.updated_ts = true;
tx_old.req_done = true;
tx_old.resp_done = true;
break;
Expand Down Expand Up @@ -537,6 +539,8 @@ impl DCERPCState {
}
}
}
tx.tx_data.updated_tc = true;
tx.tx_data.updated_ts = true;
return Some(tx);
}
}
Expand Down
4 changes: 4 additions & 0 deletions rust/src/dcerpc/dcerpc_udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ impl DCERPCUDPState {
for tx_old in &mut self.transactions.range_mut(self.tx_index_completed..) {
index += 1;
if !tx_old.req_done || !tx_old.resp_done {
tx_old.tx_data.updated_tc = true;
tx_old.tx_data.updated_ts = true;
tx_old.req_done = true;
tx_old.resp_done = true;
break;
Expand Down Expand Up @@ -165,6 +167,8 @@ impl DCERPCUDPState {
}

if let Some(tx) = otx {
tx.tx_data.updated_tc = true;
tx.tx_data.updated_ts = true;
let done = (hdr.flags1 & PFCL1_FRAG) == 0 || (hdr.flags1 & PFCL1_LASTFRAG) != 0;

match hdr.pkt_type {
Expand Down
4 changes: 4 additions & 0 deletions rust/src/http2/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,8 @@ impl HTTP2State {
let tx = &mut self.transactions[index - 1];
tx.tx_data.update_file_flags(self.state_data.file_flags);
tx.update_file_flags(tx.tx_data.file_flags);
tx.tx_data.updated_tc = true;
tx.tx_data.updated_ts = true;
return Some(tx);
} else {
// do not use SETTINGS_MAX_CONCURRENT_STREAMS as it can grow too much
Expand All @@ -667,6 +669,8 @@ impl HTTP2State {
tx_old.set_event(HTTP2Event::TooManyStreams);
// use a distinct state, even if we do not log it
tx_old.state = HTTP2TransactionState::HTTP2StateTodrop;
tx_old.tx_data.updated_tc = true;
tx_old.tx_data.updated_ts = true;
}
return None;
}
Expand Down
8 changes: 8 additions & 0 deletions rust/src/modbus/modbus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ impl ModbusState {
for tx in &mut self.transactions {
if let Some(req) = &tx.request {
if tx.response.is_none() && resp.matches(req) {
tx.tx_data.updated_tc = true;
tx.tx_data.updated_ts = true;
return Some(tx);
}
}
Expand All @@ -139,6 +141,8 @@ impl ModbusState {
for tx in &mut self.transactions {
if let Some(resp) = &tx.response {
if tx.request.is_none() && req.matches(resp) {
tx.tx_data.updated_tc = true;
tx.tx_data.updated_ts = true;
return Some(tx);
}
}
Expand Down Expand Up @@ -184,6 +188,8 @@ impl ModbusState {
match self.find_response_and_validate(&mut msg) {
Some(tx) => {
tx.set_events_from_flags(&msg.error_flags);
tx.tx_data.updated_tc = true;
tx.tx_data.updated_ts = true;
tx.request = Some(msg);
}
None => {
Expand All @@ -210,6 +216,8 @@ impl ModbusState {
} else {
tx.set_events_from_flags(&msg.error_flags);
}
tx.tx_data.updated_tc = true;
tx.tx_data.updated_ts = true;
tx.response = Some(msg);
}
None => {
Expand Down
4 changes: 4 additions & 0 deletions rust/src/mqtt/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ impl MQTTState {
if !tx.complete {
if let Some(mpktid) = tx.pkt_id {
if mpktid == pkt_id {
tx.tx_data.updated_tc = true;
tx.tx_data.updated_ts = true;
return Some(tx);
}
}
Expand All @@ -196,6 +198,8 @@ impl MQTTState {
for tx_old in &mut self.transactions.range_mut(self.tx_index_completed..) {
index += 1;
if !tx_old.complete {
tx_old.tx_data.updated_tc = true;
tx_old.tx_data.updated_ts = true;
tx_old.complete = true;
MQTTState::set_event(tx_old, MQTTEvent::TooManyTransactions);
break;
Expand Down
6 changes: 6 additions & 0 deletions rust/src/nfs/nfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,8 @@ impl NFSState {
// set at least one another transaction to the drop state
for tx_old in &mut self.transactions {
if !tx_old.request_done || !tx_old.response_done {
tx_old.tx_data.updated_tc = true;
tx_old.tx_data.updated_ts = true;
tx_old.request_done = true;
tx_old.response_done = true;
tx_old.is_file_closed = true;
Expand Down Expand Up @@ -500,6 +502,8 @@ impl NFSState {
pub fn mark_response_tx_done(&mut self, xid: u32, rpc_status: u32, nfs_status: u32, resp_handle: &[u8])
{
if let Some(mytx) = self.get_tx_by_xid(xid) {
mytx.tx_data.updated_tc = true;
mytx.tx_data.updated_ts = true;
mytx.response_done = true;
mytx.rpc_response_status = rpc_status;
mytx.nfs_response_status = nfs_status;
Expand Down Expand Up @@ -756,6 +760,8 @@ impl NFSState {
tx.tx_data.update_file_flags(self.state_data.file_flags);
d.update_file_flags(tx.tx_data.file_flags);
SCLogDebug!("Found NFS file TX with ID {} XID {:04X}", tx.id, tx.xid);
tx.tx_data.updated_tc = true;
tx.tx_data.updated_ts = true;
return Some(tx);
}
}
Expand Down
4 changes: 4 additions & 0 deletions rust/src/pgsql/pgsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ impl PgsqlState {
for tx_old in &mut self.transactions.range_mut(self.tx_index_completed..) {
index += 1;
if tx_old.tx_res_state < PgsqlTxProgress::TxDone {
tx_old.tx_data.updated_tc = true;
tx_old.tx_data.updated_ts = true;
// we don't check for TxReqDone for the majority of requests are basically completed
// when they're parsed, as of now
tx_old.tx_req_state = PgsqlTxProgress::TxFlushedOut;
Expand Down Expand Up @@ -361,6 +363,7 @@ impl PgsqlState {
// A simplified finite state machine for PostgreSQL v3 can be found at:
// https://samadhiweb.com/blog/2013.04.28.graphviz.postgresv3.html
if let Some(tx) = self.find_or_create_tx() {
tx.tx_data.updated_ts = true;
tx.request = Some(request);
if let Some(state) = new_state {
if Self::request_is_complete(state) {
Expand Down Expand Up @@ -519,6 +522,7 @@ impl PgsqlState {
self.state_progress = state;
}
if let Some(tx) = self.find_or_create_tx() {
tx.tx_data.updated_tc = true;
if tx.tx_res_state == PgsqlTxProgress::TxInit {
tx.tx_res_state = PgsqlTxProgress::TxReceived;
}
Expand Down
8 changes: 7 additions & 1 deletion rust/src/rfb/rfb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,13 @@ impl RFBState {

fn get_current_tx(&mut self) -> Option<&mut RFBTransaction> {
let tx_id = self.tx_id;
self.transactions.iter_mut().find(|tx| tx.tx_id == tx_id)
let r = self.transactions.iter_mut().find(|tx| tx.tx_id == tx_id);
if let Some(tx) = r {
tx.tx_data.updated_tc = true;
tx.tx_data.updated_ts = true;
return Some(tx);
}
return None;
}

fn parse_request(&mut self, flow: *const Flow, stream_slice: StreamSlice) -> AppLayerResult {
Expand Down
2 changes: 2 additions & 0 deletions rust/src/smb/dcerpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ impl SMBState {
_ => { false },
};
if found {
tx.tx_data.updated_tc = true;
tx.tx_data.updated_ts = true;
return Some(tx);
}
}
Expand Down
4 changes: 4 additions & 0 deletions rust/src/smb/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ impl SMBState {
tx.tx_data.update_file_flags(self.state_data.file_flags);
d.update_file_flags(tx.tx_data.file_flags);
}
tx.tx_data.updated_tc = true;
tx.tx_data.updated_ts = true;
return Some(tx);
}
}
Expand All @@ -152,6 +154,8 @@ impl SMBState {
tx.tx_data.update_file_flags(self.state_data.file_flags);
d.update_file_flags(tx.tx_data.file_flags);
}
tx.tx_data.updated_tc = true;
tx.tx_data.updated_ts = true;
return Some(tx);
}
}
Expand Down
2 changes: 2 additions & 0 deletions rust/src/smb/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ impl SMBState {
_ => { false },
};
if hit {
tx.tx_data.updated_tc = true;
tx.tx_data.updated_ts = true;
return Some(tx);
}
}
Expand Down
10 changes: 10 additions & 0 deletions rust/src/smb/smb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,8 @@ impl SMBState {
for tx_old in &mut self.transactions.range_mut(self.tx_index_completed..) {
index += 1;
if !tx_old.request_done || !tx_old.response_done {
tx_old.tx_data.updated_tc = true;
tx_old.tx_data.updated_ts = true;
tx_old.request_done = true;
tx_old.response_done = true;
tx_old.set_event(SMBEvent::TooManyTransactions);
Expand Down Expand Up @@ -924,6 +926,8 @@ impl SMBState {
false
};
if found {
tx.tx_data.updated_tc = true;
tx.tx_data.updated_ts = true;
return Some(tx);
}
}
Expand All @@ -948,6 +952,8 @@ impl SMBState {
false
};
if found {
tx.tx_data.updated_tc = true;
tx.tx_data.updated_ts = true;
return Some(tx);
}
}
Expand Down Expand Up @@ -986,6 +992,8 @@ impl SMBState {
_ => { false },
};
if found {
tx.tx_data.updated_tc = true;
tx.tx_data.updated_ts = true;
return Some(tx);
}
}
Expand Down Expand Up @@ -1019,6 +1027,8 @@ impl SMBState {
_ => { false },
};
if hit {
tx.tx_data.updated_tc = true;
tx.tx_data.updated_ts = true;
return Some(tx);
}
}
Expand Down
2 changes: 2 additions & 0 deletions rust/src/ssh/ssh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ pub unsafe extern "C" fn rs_ssh_parse_request(
let state = &mut cast_pointer!(state, SSHState);
let buf = stream_slice.as_slice();
let hdr = &mut state.transaction.cli_hdr;
state.transaction.tx_data.updated_ts = true;
if hdr.flags < SSHConnectionState::SshStateBannerDone {
return state.parse_banner(buf, false, pstate);
} else {
Expand All @@ -375,6 +376,7 @@ pub unsafe extern "C" fn rs_ssh_parse_response(
let state = &mut cast_pointer!(state, SSHState);
let buf = stream_slice.as_slice();
let hdr = &mut state.transaction.srv_hdr;
state.transaction.tx_data.updated_tc = true;
if hdr.flags < SSHConnectionState::SshStateBannerDone {
return state.parse_banner(buf, true, pstate);
} else {
Expand Down
2 changes: 2 additions & 0 deletions src/app-layer-dnp3.c
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,7 @@ static void DNP3HandleUserDataRequest(DNP3State *dnp3, const uint8_t *input,
/* Update the saved transport header so subsequent segments
* will be matched to this sequence number. */
tx->th = th;
tx->tx_data.updated_ts = true;
}
else {
ah = (DNP3ApplicationHeader *)(input + sizeof(DNP3LinkHeader) +
Expand Down Expand Up @@ -971,6 +972,7 @@ static void DNP3HandleUserDataResponse(DNP3State *dnp3, const uint8_t *input,
/* Replace the transport header in the transaction with this
* one in case there are more frames. */
tx->th = th;
tx->tx_data.updated_tc = true;
}
else {
ah = (DNP3ApplicationHeader *)(input + offset);
Expand Down
7 changes: 6 additions & 1 deletion src/app-layer-ftp.c
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ static AppLayerResult FTPParseResponse(Flow *f, void *ftp_state, AppLayerParserS
SCReturnStruct(APP_LAYER_ERROR);
}
lasttx = tx;
tx->tx_data.updated_tc = true;
if (state->command == FTP_COMMAND_UNKNOWN || tx->command_descriptor == NULL) {
/* unknown */
tx->command_descriptor = &FtpCommands[FTP_COMMAND_MAX - 1];
Expand Down Expand Up @@ -1008,7 +1009,11 @@ static AppLayerResult FTPDataParse(Flow *f, FtpDataState *ftpdata_state,
SCTxDataUpdateFileFlags(&ftpdata_state->tx_data, ftpdata_state->state_data.file_flags);
if (ftpdata_state->tx_data.file_tx == 0)
ftpdata_state->tx_data.file_tx = direction & (STREAM_TOSERVER | STREAM_TOCLIENT);

if (direction & STREAM_TOSERVER) {
ftpdata_state->tx_data.updated_ts = true;
} else {
ftpdata_state->tx_data.updated_tc = true;
}
/* we depend on detection engine for file pruning */
const uint16_t flags = FileFlowFlagsToFlags(ftpdata_state->tx_data.file_flags, direction);
int ret = 0;
Expand Down
Loading

0 comments on commit 782f35c

Please sign in to comment.