Greetings all.
We bumped into this problem too, which I would define as this:
Problem definition
If your volume of events is large, and you don't constrain your query somehow, bumps in volume cause you to get behind, which increases your query time, ensuring that you will never get logs again without manual intervention.
Limitations of the ReceivedUTC constraint
The ReceivedUTC approach has a huge drawback, in that if you ever miss logs for a day, you will never get them indexed unless you notice and either fiddle your input or do one-time pulls.
Our requirements for a solution
The solution we were looking for needed to be able to query in constant time, because if we got behind for whatever reason (DB connectivity problem, for example), we needed it to catch up without query modification or manual intervention.Our solution then, needed to constrain the query based not on datetime, but on a fixed range of AutoID that we knew the general query time for.
The solution always needed to get the oldest records, so that if there was an interuption, it would catch up from where it left off (which is the whole point of a rising column, right?).
Solution
Determine the number of AutoIDs that, even if non-sparse (i.e., most AutoIDs return a record), a query can return within the timeout. We landed on 2 million, for which a full query took no more than 200 seconds to return.
Use a variable for checkpoint (we chose @checkpoint), so that we can use it more than once in the query to constrain the query to be greater than the checkpoint value and less than (checkpoint plus the number we just figured out).
Choose comfortable cushions, so that even if the DB is crazy-loaded, it will probably return within your timeout. We chose double, 400 secs, for our query timeout and 600 for the run interval
Choose high-volume high-performance settings for max rows and fetch size, so that if you do get behind, you catch up stably and quickly.
Here's what we use, and it works like a champ. Hell, two champions.
db_inputs.conf (the important settings)
interval = 600
max_rows = 1000000
mode = rising
fetch_size = 1000
query_timeout = 400
QUERY
DECLARE @checkpoint bigint = ?
SELECT
[EPOEvents].[ReceivedUTC] as [timestamp],
[EPOEvents].[AutoID],
[EPOEvents].[ThreatName] as [signature],
[EPOEvents].[ThreatType] as [threat_type],
[EPOEvents].[ThreatEventID] as [signature_id],
[EPOEvents].[ThreatCategory] as [category],
[EPOEvents].[ThreatSeverity] as [severity_id],
[EPOEventFilterDesc].[Name] as [event_description],
[EPOEvents].[DetectedUTC] as [detected_timestamp],
[EPOEvents].[TargetFileName] as [file_name],
[EPOEvents].[AnalyzerDetectionMethod] as [detection_method],
[EPOEvents].[ThreatActionTaken] as [vendor_action],
CAST([EPOEvents].[ThreatHandled] as int) as [threat_handled],
[EPOEvents].[TargetUserName] as [logon_user],
[EPOComputerProperties].[UserName] as [user],
[EPOComputerProperties].[DomainName] as [dest_nt_domain],
[EPOEvents].[TargetHostName] as [dest_dns],
[EPOEvents].[TargetHostName] as [dest_nt_host],
[EPOComputerProperties].[IPHostName] as [fqdn],
[dest_ip] = ( convert(varchar(3),convert(tinyint,substring(convert(varbinary(4),convert(bigint,([EPOComputerProperties].[IPV4x] + 2147483648))),1,1)))+'.'+convert(varchar(3),convert(tinyint,substring(convert(varbinary(4),convert(bigint,([EPOComputerProperties].[IPV4x] + 2147483648))),2,1)))+'.'+convert(varchar(3),convert(tinyint,substring(convert(varbinary(4),convert(bigint,([EPOComputerProperties].[IPV4x] + 2147483648))),3,1)))+'.'+convert(varchar(3),convert(tinyint,substring(convert(varbinary(4),convert(bigint,([EPOComputerProperties].[IPV4x] + 2147483648))),4,1))) ),
[EPOComputerProperties].[SubnetMask] as [dest_netmask],
[EPOComputerProperties].[NetAddress] as [dest_mac],
[EPOComputerProperties].[OSType] as [os],
[EPOComputerProperties].[OSServicePackVer] as [sp],
[EPOComputerProperties].[OSVersion] as [os_version],
[EPOComputerProperties].[OSBuildNum] as [os_build],
[EPOComputerProperties].[TimeZone] as [timezone],
[EPOEvents].[SourceHostName] as [src_dns],
[src_ip] = ( convert(varchar(3),convert(tinyint,substring(convert(varbinary(4),convert(bigint,([EPOEvents].[SourceIPV4] + 2147483648))),1,1)))+'.'+convert(varchar(3),convert(tinyint,substring(convert(varbinary(4),convert(bigint,([EPOEvents].[SourceIPV4] + 2147483648))),2,1)))+'.'+convert(varchar(3),convert(tinyint,substring(convert(varbinary(4),convert(bigint,([EPOEvents].[SourceIPV4] + 2147483648))),3,1)))+'.'+convert(varchar(3),convert(tinyint,substring(convert(varbinary(4),convert(bigint,([EPOEvents].[SourceIPV4] + 2147483648))),4,1))) ),
[EPOEvents].[SourceMAC] as [src_mac],
[EPOEvents].[SourceProcessName] as [process],
[EPOEvents].[SourceURL] as [url],
[EPOEvents].[SourceUserName] as [source_logon_user],
[EPOComputerProperties].[IsPortable] as [is_laptop],
[EPOEvents].[AnalyzerName] as [product],
[EPOEvents].[AnalyzerVersion] as [product_version],
[EPOEvents].[AnalyzerEngineVersion] as [engine_version],
[EPOEvents].[AnalyzerDATVersion] as [dat_version],
[EPOProdPropsView_VIRUSCAN].[datver] as [vse_dat_version],
[EPOProdPropsView_VIRUSCAN].[enginever64] as [vse_engine64_version],
[EPOProdPropsView_VIRUSCAN].[enginever] as [vse_engine_version],
[EPOProdPropsView_VIRUSCAN].[hotfix] as [vse_hotfix],
[EPOProdPropsView_VIRUSCAN].[productversion] as [vse_product_version],
[EPOProdPropsView_VIRUSCAN].[servicepack] as [vse_sp]
FROM [EPOEvents]
LEFT JOIN [EPOLeafNode] ON [EPOEvents].[AgentGUID] = [EPOLeafNode].[AgentGUID]
LEFT JOIN [EPOProdPropsView_VIRUSCAN] ON [EPOLeafNode].[AutoID] = [EPOProdPropsView_VIRUSCAN].[LeafNodeID]
LEFT JOIN [EPOComputerProperties] ON [EPOLeafNode].[AutoID] = [EPOComputerProperties].[ParentID]
LEFT JOIN [EPOEventFilterDesc] ON [EPOEvents].[ThreatEventID] = [EPOEventFilterDesc].[EventId]
AND ([EPOEventFilterDesc].[Language]='0409')
WHERE [EPOEvents].[AutoID] > @checkpoint and [EPOEvents].[AutoID] >274560304 and [EPOEvents].[AutoID] < (@checkpoint + 2000000)
ORDER BY [EPOEvents].[AutoID] ASC
... View more