1.Problem description
The current production environment has encountered incomplete data returned by using the query map function.
We have created dashboards and queries separately.
But the fundamental reason is that there is also a problem with the dashboard query results, as it involves the map function. I hope it can help solve this problem!
Core requirements:
Continuous tracking of sensitive information behavior alerts Requirement:
1) When audit records that match the behavior characteristics of continuously tracking sensitive information are found, alarm information will be displayed on the homepage. Clicking on the alarm information will show the relevant alarm details
2) Auditors can set query values through the interface, and the query values will be used as default values for the next opening (parameters such as cycle and trigger days). They will be sorted by daily granularity (for queries with the same conditions, they will be calculated once a day). Once the cumulative trigger value is reached over time, an alarm will be triggered.
Filter criteria: 'Start Date' (manually entered, plugin, default is the beginning of this month), 'End Date' (manually entered, plugin, default is the current date), 'Name' (drop-down box, can be entered), 'Department' (drop-down box, can be entered), 'Period' (input box), 'Trigger Days' (input box)
For example, if the start date is T0, the end date is TD, the cycle is N days, and the trigger days are M days, the system should calculate whether each user continuously accesses the same sensitive account more than M times from T0 to T0+N days, and then calculate the number of visits from T1 to T0+1+N days, T0+2 to T0+2+N days... T0+D to T0+D+N days (each user accessing the same sensitive account multiple times a day is recorded as 1 time, and not accumulated among different users). During this period, for each continuous visit that exceeds M times, a detail will be displayed.
Click on the specific number of visits to see the details of each visit record.
2.The sample data is
The data fields are:
"date","number","name","department","status"
2020-01-01 00:05:10 "," 00010872 "," Baoda "," Technical Department "," Yes "
Sample data generation script:
#!/bin/bash
#Define Name List
first_names=(" Zhao Qian Sun Li Zhou Wu Zheng Wang Feng Chen Chu Wei Jiang Shen Han Yang Zhu Qin You Xu He Lv Shi Zhang Kong Cao Yan Hua Jin Wei Tao Jiang Qi Xie Zou Yu Bai Shui Dou Zhang Yun Su Pan Ge Xi Fan Peng Lang Lu Wei Chang Ma Miao Feng Hua Fang Yu Ren Yuan Liu Feng Bao Shi Tang Fei Lian Cen Xue Lei He Ni Tang Teng Yin Luo Bi Hao Wu An Chang Le Shi Fu Pi Ka Qi Kang Wu Yu Yuan Bu Gu Meng Ping Huang and so on. Mu Xiao Yin Yao Shao Zhan Wang Qi Mao Yu Di Mi Bei Ming Zang Ji Fu Cheng Dai Tan Song Mao Pang Xiong Ji Shu Qu Xiang Congratulations to Dong Liang Du Ruan Lan Min Xi Ji Ma Qiang Jia Lu Lou Wei Jiang Tong Yan Guo Mei Sheng Lin Diao Zhong Xu Qiu Luo Gao Xia Cai Tian Fan Hu Ling Huo Yu Wan Zhi Ke Yang Guan Lu Mo Jing Fang Qiu Miao. Gan " Jie" Ying " Zong" Ding " Xuan" Ben " Deng" Yu " Dan" Hang " Hong" Bao " Zhu" Zuo " Shi" Cui " Ji" Niu " Gong" Cheng " Ji" Xing " Xie" Pei " Lu" Rong " Weng" Xun " Yang" Hui " Zhen" Qu " Jia" Feng "Rui" Yi " Chu" Jin " Ji" Yong " Mi" Song " Jing" Section "Fu Wu Jiao Ba Gong Mu Kui Shan Gu Che Hou Mi Peng Quan Xi Ban Yang Qiu Zhong Yi Gong Ning Qiu Luan Bao Gan Zhen Li Rong Zu Wu Fu Liu Jing Zhan Shu Long Ye Xing Si Shao Gao Li Ji Bo Yin Shen Party Zhai Tan Gong Lao Ji Shen Fu Du Ran Zai Li Yong Qu Sang Gui Pu Niu Shou Tong Bian Hu Yan Ji Jia Pu Shang Nong Wen Bie Zhuang Yan Chai Qu Yan Chong Mu Lian Ru Xi Huan Ai Yu Rong Xiang Gu Yi Shen Ge Liao Yu Zhong Ju Heng Bu Du Geng Man Hong Kuang Guo Wen Kou Guang Lu Que Dong Wu Wei Yue Kui Long Shi Gong... Nie Chao Gou Ao Rong "" Leng "" Xu "" Xin "" Kan "" Na "" Jian "" Rao "" Kong "" Zeng "" Wu "" Sha "" Ni "" Yang "" Ju "" Xu "" Feng "" Chao "" Guan "" Kuai "" Xiang "" Cha "" After "Jing" "Hong" "You" "Zhu" "Quan" "Lu" "Gai" "Yi" "Huan" "Gong" "Wan Wei" "Sima" "Shangguan" "Ouyang" "Xiahou" "Zhuge" "Wen Ren" "Dongfang" "Helian" "Huangfu" Weichi Gongyang Dantai Gongye Zongzheng Puyang Chunyu Chanyu Taishu Shentu Gongsun Zhongsun Xuanyuan Linghu Zhongli Yuwen Changsun Murong Xianyu Luqiu Situ Sikong Qiguan Sikou Zongdu Ziche Zhuansun Duanmu Wuma Gongxi Lacquer Carving Lezheng Rangsi Gongliang Tuoba Jia Gu Zaifu Gu Liang Jin Chu Yan Fa Ru Yan Tu Qin Duan Gan Bai Li Dong Guo Nan Men Hu Yan Gui Hai Yang Zhi Wei Sheng Yue " Shuai" Gou " Kang" Kuang " Kuang" Hou "" You "" Qin "" Liang Qiu "" Zuo Qiu "" East Gate "" West Gate "" Shang Mou "" She "" Xie "" Bo Shang "" Nan Gong "" Mo Ha "" Qiao "" Qiao "" Nian "" Ai "" Yang "" Tong "" Fifth "" Yan Fu "
last_names=(" Wei Gang Yong Yi Jun Feng Qiang Jun Ping Bao Dong Wen Hui Li Ming Yong Jian Shi Guang Zhi Yi Xing Liang Hai Shan Ren Bo Ning Gui Fu Sheng Long Yuan Quan Guo Sheng Xue Xiang Cai Fa Wu Xin Li Qing Fei Bin Fu Shun Xin Zi. Jie Tao Chang Cheng Kang Xing Guang Tian Da An Yan Zhong Mao Jin Lin You have "Jian" "Biao" "Bo" "Cheng" "Hui" "Si" "Qun" "Hao" "Xin" "Bang" "Cheng" "Le" "Shao" "Gong" "Song" "Shan" "Hou" "Qing" "Lei" "Min" "You" "Yu" "He" "Zhe" "Jiang" "Chao" "Hao". Liang Zheng Qian Heng Qi Gu Lun Xiang Xu Peng etc Ze "" Chen "" Chen "" Shi "" To "Build" "Home" "To" Tree "" Yan "" De "" Xing "" Time "" Tai "" Sheng "" Xiong "" Chen "" Jun "" Guan "" Ce "Teng" "Nan" "Rong" "Feng" "Hang" "Hong"
Departments=("Logistics Department" "Finance Department" "Technology Department" "Marketing Department" "Sales Department" "Human Resources Department")
#Timestamp of January 1st, 2020
start_date=$(date -d "2020-01-01" +%s)
#Current timestamp, initialized to the starting time
current_date=$start_date
#Used to record the previous number
prev_number=""
#Used to record the previous name
prev_name=""
#Used to record the previous department
prev_department=""
#Used to control whether to generate new numbers
generate_new_number=true
#Used to record the start time of continuous access
start_visit_time=0
#Output CSV header
echo "\"date\",\"number\",\"name\",\"department\",\"status\""
while true; do
#Generate random increments between 0 and 86400 seconds (random seconds within a day)
random_seconds=$((RANDOM % 86400))
current_date=$((current_date + random_seconds))
#Check if it exceeds the current time
if [ $current_date -gt $(date +%s) ]; then
break
fi
#The probability of generating a new combination is 1/5
if [ $((RANDOM % 5)) -eq 0 ]; then
generate_new_number=true
fi
if [ $generate_new_number = true ]; then
#Generate an 8-bit random number
number=$(printf "%08d" $((RANDOM % 100000000)))
prev_number=$number
#Randomly select a name
first_name=$(echo ${first_names[RANDOM % ${#first_names[@]}]})
last_name=$(echo ${last_names[RANDOM % ${#last_names[@]}]})
prev_name="$first_name$last_name"
#Randomly select departments
prev_department=$(echo ${departments[RANDOM % ${#departments[@]}]})
generate_new_number=false
start_visit_time=$current_date
else
number=$prev_number
if [ $((current_date - start_visit_time)) -gt $((7 * 86400)) ]; then
generate_new_number=true
continue
fi
fi
#Convert timestamp to date time format
full_date=$(date -d @$current_date +%Y-%m-%d\ %H:%M:%S)
#Fixed status is yes
Yes_no="Yes"
#Output CSV data rows
echo "\"$full_date\",\"$number\",\"$prev_name\",\"$prev_department\",\"$yes_no\""
done
3..Data query SPL
| makeresults
| addinfo
| table info_min_time info_max_time earliest_time latest_time
| join
[ search index=edw sourcetype=csv status="Yes"
| stats earliest(_time) as earliest_time latest(_time) as latest_time ]
| eval searchEarliestTime=if(info_min_time == "0.000",earliest_time,info_min_time)
| eval searchLatestTime=if(info_max_time="+Infinity", relative_time(latest_time,"+1d"), info_max_time)
| eval start=mvrange(searchEarliestTime, searchLatestTime, "1d")
| mvexpand start
| eval start=strftime(start, "%F 00:00:00")
| eval start=strptime(start, "%F %T")
| eval start=round(start)
| eval end=relative_time(start,"+7d")
| where end < searchLatestTime
| eval a=strftime(start, "%F")
| eval b=strftime(end, "%F")
| fields start a end b
| map search="search earliest=$start$ latest=$end$
index=edw sourcetype=csv status="Yes" | bin _time span=1d | stats dc(_time) as "cishu" by _time day name department number
| eval a=$a$ | eval b=$b$
| stats sum(cishu) as count,values(day) as "Query date" by a b name number department
" maxsearches=500000
| where count > 2
instrument panel xml
<form version="1.1">
<label>Homepage Warning Information Display (Verification)</label>
<description></description>
<search>
<query>| makeresults | addinfo | table info_min_time info_max_time earliest_time latest_time
| join [ search index=edw sourcetype=csv status="Yes" | stats earliest(_time) as earliest_time latest(_time) as latest_time ] </query>
<earliest>$query_date.earliest$</earliest>
<latest>$query_date.latest$</latest>
<progress>
<unset token="generate_time1"></unset>
</progress>
<finalized>
<eval token="searchEarliestTime1">if($result.info_min_time$ == "0.000",$result.earliest_time$,$result.info_min_time$)</eval>
<eval token="searchLatestTime1">if($result.info_max_time$="+Infinity", relative_time($result.latest_time$,"+1d"), $result.info_max_time$)</eval>
<set token="generate_time1">true</set>
</finalized>
</search>
<label>Homepage Warning Information Display (Verification)</label>
<search>
<query>| makeresults | addinfo | table info_min_time info_max_time earliest_time latest_time
| join [ search index=edw sourcetype=csv status="Yes" | stats earliest(_time) as earliest_time latest(_time) as latest_time ] </query>
<earliest>$query_date2.earliest$</earliest>
<latest>$query_date2.latest$</latest>
<progress>
<unset token="generate_time2"></unset>
</progress>
<finalized>
<eval token="searchEarliestTime2">if($result.info_min_time$ == "0.000",$result.earliest_time$,$result.info_min_time$)</eval>
<eval token="searchLatestTime2">if($result.info_max_time$="+Infinity", relative_time($result.latest_time$,"+1d"), $result.info_max_time$)</eval>
<set token="generate_time2">true</set>
</finalized>
</search>
<search>
<query>| makeresults | eval start=mvrange($searchEarliestTime1$, $searchLatestTime1$, "1d") | mvexpand start | eval end=relative_time(start,"+$time_interval$d") | where end <=$searchLatestTime1$ | stats count</query>
<progress>
<unset token="error_input1"></unset>
<unset token="display_info1"></unset>
</progress>
<finalized>
<condition match="$result.count$ > 0">
<eval token="display_info1">$result.count$</eval>
</condition>
<condition>
<eval token="error_input1">$result.count$</eval>
</condition>
</finalized>
</search>
<search>
<query>| makeresults | eval start=mvrange($searchEarliestTime2$, $searchLatestTime2$, "1d") | mvexpand start | eval end=relative_time(start,"+$time_interval_2$d") | where end <=$searchLatestTime2$ | stats count</query>
<progress>
<unset token="error_input2"></unset>
<unset token="display_info2"></unset>
</progress>
<finalized>
<condition match="$result.count$ > 0">
<eval token="display_info2">$result.count$</eval>
</condition>
<condition>
<eval token="error_input2">$result.count$</eval>
</condition>
</finalized>
</search>
<fieldset submitButton="false"></fieldset>
<row>
<panel>
<title>number of events</title>
<single>
<search>
<query>index=edw sourcetype=csv status="Yes"
| bin _time span=1d | stats count by _time
| stats sum(count) as 次数</query>
<earliest>$query_date.earliest$</earliest>
<latest>$query_date.latest$</latest>
</search>
<option name="colorMode">block</option>
<option name="drilldown">none</option>
<option name="height">128</option>
<option name="rangeColors">["0x53a051","0x53a051"]</option>
<option name="rangeValues">[0]</option>
<option name="refresh.display">progressbar</option>
<option name="useColors">1</option>
</single>
</panel>
</row>
<row>
<panel depends="$generate_time1$">
<title>Sensitive account access alert</title>
<input type="time" searchWhenChanged="true" token="query_date">
<label>Query date</label>
<default>
<earliest>@mon</earliest>
<latest>now</latest>
</default>
</input>
<input type="text" searchWhenChanged="true" token="time_interval">
<label>cycle</label>
<default>7</default>
</input>
<input type="text" searchWhenChanged="true" token="trigger">
<label>Trigger Value</label>
<default>2</default>
</input>
</panel>
</row>
<row>
<panel depends="$error_input1$">
<html>
<code>The query interval cannot be generated, please check the input</code>
</html>
</panel>
</row>
<row>
<panel depends="$display_info1$">
<single>
<search>
<query>| makeresults
| eval start=mvrange($searchEarliestTime1$, $searchLatestTime1$, "1d")
| mvexpand start
| eval end=relative_time(start,"+$time_interval$d")
| where end <=$searchLatestTime1$
| eval a=strftime(start, "%F")
| eval b=strftime(end, "%F")
| fields start a end b
| map search="search earliest=\"$$start$$\" latest=\"$$end$$\" index=edw sourcetype=csv status="Yes" | stats count by name number department " maxsearches=500000
| where count > $trigger$
| stats count</query>
<earliest>$query_date.earliest$</earliest>
<latest>$query_date.latest$</latest>
</search>
<option name="afterLabel">Example of Sensitive Account Access Alert Event</option>
<option name="beforeLabel">Found that</option>
<option name="colorBy">value</option>
<option name="colorMode">block</option>
<option name="drilldown">all</option>
<option name="linkView">search</option>
<option name="numberPrecision">0</option>
<option name="rangeColors">["0x65a637","0xd93f3c"]</option>
<option name="rangeValues">[0]</option>
<option name="refresh.display">progressbar</option>
<option name="showSparkline">1</option>
<option name="showTrendIndicator">1</option>
<option name="trendColorInterpretation">standard</option>
<option name="trendDisplayMode">absolute</option>
<option name="unitPosition">after</option>
<option name="useColors">1</option>
<option name="useThousandSeparators">1</option>
<drilldown>
<link target="_blank">search?q=%7C%20makeresults%20%0A%7C%20eval%20start%3Dmvrange($searchEarliestTime1$%2C%20$searchLatestTime1$%2C%20%221d%22)%20%0A%7C%20mvexpand%20start%20%0A%7C%20eval%20end%3Drelative_time(start%2C%22%2B$time_interval$d%22)%20%0A%7C%20where%20end%20%3C%3D$searchLatestTime1$%20%0A%7C%20eval%20a%3Dstrftime(start%2C%20%22%25F%22)%20%0A%7C%20eval%20b%3Dstrftime(end%2C%20%22%25F%22)%20%0A%7C%20fields%20start%20a%20end%20b%20%0A%7C%20map%20search%3D%22search%20earliest%3D%5C%22$$start$$%5C%22%20latest%3D%5C%22$$end$$%5C%22%20index%3Dedw%20sourcetype%3Dcsv%20status%3D%22%E6%98%AF%22%20%20%7C%20stats%20count%20%20by%20name%20number%20department%20%7C%20where%20count%20%3E$trigger$%20%20%20%22%20maxsearches%3D500000&earliest=$query_date.earliest$&latest=$query_date.latest$</link>
</drilldown>
</single>
</panel>
</row>
<row>
<panel>
<table>
<search>
<query>| makeresults
| eval start=mvrange($searchEarliestTime1$, $searchLatestTime1$, "1d")
| mvexpand start
| eval end=relative_time(start,"+$time_interval$d")
| where end <=$searchLatestTime1$
| eval a=strftime(start, "%F")
| eval b=strftime(end, "%F")
| fields start a end b
| map search="search earliest=\"$$start$$\" latest=\"$$end$$\" index=edw sourcetype=csv status="Yes" | stats count by name number department " maxsearches=500000
| where count >$trigger$</query>
<earliest>$query_date.earliest$</earliest>
<latest>$query_date.latest$</latest>
</search>
<option name="drilldown">cell</option>
<option name="refresh.display">progressbar</option>
<option name="rowNumbers">true</option>
<option name="wrap">true</option>
</table>
</panel>
</row>
<row>
<panel depends="$generate_time2$">
<title>Continuous tracking of sensitive information behavior alerts</title>
<input type="time" searchWhenChanged="true" token="query_date2">
<label>Query date</label>
<default>
<earliest>@mon</earliest>
<latest>now</latest>
</default>
</input>
<input type="text" searchWhenChanged="true" token="time_interval_2">
<label>cycle</label>
<default>7</default>
</input>
<input type="text" searchWhenChanged="true" token="trigger2">
<label>Trigger Value</label>
<default>2</default>
</input>
</panel>
</row>
<row>
<panel depends="$error_input2$">
<html>
<code>The query interval cannot be generated, please check the input</code>
</html>
</panel>
</row>
<row>
<panel depends="$display_info2$">
<single>
<search>
<query>| makeresults
| eval start=mvrange($searchEarliestTime2$, $searchLatestTime2$, "1d")
| mvexpand start
| eval end=relative_time(start,"+$time_interval_2$d")
| eval alert_date=relative_time(end,"+1d")
| where end <=$searchLatestTime2$
| eval a=strftime(start, "%F")
| eval b=strftime(end, "%F")
| eval c=strftime(alert_date,"%F")
| fields start a end b c
| map search="search earliest=\"$$start$$\" latest=\"$$end$$\"
index=edw sourcetype=csv status="Yes" | bin _time span=1d | stats dc(_time) as "cishu" by _time day name department number
| eval a=$$a$$ | eval b=$$b$$ | eval c=$$c$$
| stats sum(cishu) as count,values(day) as "Query date" by a b c name number department
" maxsearches=500000
| where count > $trigger2$
| stats count</query>
<earliest>$query_date2.earliest$</earliest>
<latest>$query_date2.latest$</latest>
</search>
<option name="afterLabel">Example "User Continuous Tracking Behavior" Event</option>
<option name="beforeLabel">Found that</option>
<option name="colorBy">value</option>
<option name="colorMode">block</option>
<option name="drilldown">all</option>
<option name="linkView">search</option>
<option name="numberPrecision">0</option>
<option name="rangeColors">["0x65a637","0xd93f3c"]</option>
<option name="rangeValues">[0]</option>
<option name="refresh.display">progressbar</option>
<option name="showSparkline">1</option>
<option name="showTrendIndicator">1</option>
<option name="trendColorInterpretation">standard</option>
<option name="trendDisplayMode">absolute</option>
<option name="unitPosition">after</option>
<option name="useColors">1</option>
<option name="useThousandSeparators">1</option>
<drilldown>
<link target="_blank">search?q=%7C%20makeresults%20%0A%7C%20eval%20start%3Dmvrange($searchEarliestTime2$%2C%20$searchLatestTime2$%2C%20%221d%22)%20%0A%7C%20mvexpand%20start%20%0A%7C%20eval%20end%3Drelative_time(start%2C%22%2B$time_interval_2$d%22)%20%0A%7C%20eval%20alert_date%3Drelative_time(end%2C%22%2B1d%22)%0A%7C%20where%20end%20%3C%3D$searchLatestTime2$%20%0A%7C%20eval%20a%3Dstrftime(start%2C%20%22%25F%22)%20%0A%7C%20eval%20b%3Dstrftime(end%2C%20%22%25F%22)%20%0A%7C%20eval%20c%3Dstrftime(alert_date%2C%22%25F%22)%20%0A%7C%20fields%20start%20a%20end%20b%20c%0A%7C%20map%20search%3D%22search%20earliest%3D%5C%22$$start$$%5C%22%20latest%3D%5C%22$$end$$%5C%22%20%20%0Aindex%3Dedw%20sourcetype%3Dcsv%20status%3D%22%E6%98%AF%22%20%20%20%7C%20bin%20_time%20span%3D1d%20%20%7C%20stats%20dc(_time)%20as%20%22%E8%AE%BF%E9%97%AE%E6%95%8F%E6%84%9F%E8%B4%A6%E6%88%B7%E6%AC%A1%E6%95%B0%22%20by%20%20_time%20day%20name%20department%20number%0A%20%20%20%20%7C%20eval%20a%3D$$a$$%20%20%7C%20eval%20b%3D$$b$$%20%7C%20eval%20c%3D$$c$$%0A%7C%20stats%20sum(%E8%AE%BF%E9%97%AE%E6%95%8F%E6%84%9F%E8%B4%A6%E6%88%B7%E6%AC%A1%E6%95%B0)%20as%20count%2Cvalues(day)%20as%20%22%E6%9F%A5%E8%AF%A2%E6%97%A5%E6%9C%9F%22%20by%20a%20b%20c%20name%20number%20department%0A%22%20maxsearches%3D500000%20%0A%7C%20where%20count%20%3E%20$trigger2$&earliest=$query_date.earliest$&latest=$query_date.latest$</link>
</drilldown>
</single>
</panel>
</row>
<row>
<panel>
<table>
<search>
<query>| makeresults
| eval start=mvrange($searchEarliestTime2$, $searchLatestTime2$, "1d")
| mvexpand start
| eval end=relative_time(start,"+$time_interval_2$d")
| eval alert_date=relative_time(end,"+1d")
| where end <=$searchLatestTime2$
| eval a=strftime(start, "%F")
| eval b=strftime(end, "%F")
| eval c=strftime(alert_date,"%F")
| fields start a end b c
| map search="search earliest=\"$$start$$\" latest=\"$$end$$\"
index=edw sourcetype=csv status="Yes" | bin _time span=1d | stats dc(_time) as "cishu" by _time day name department number
| eval a=$$a$$ | eval b=$$b$$ | eval c=$$c$$
| stats sum(cishu) as count,values(day) as "Query date" by a b c name number department
" maxsearches=500000
| where count > $trigger2$</query>
<earliest>$query_date2.earliest$</earliest>
<latest>$query_date2.latest$</latest>
</search>
<option name="drilldown">cell</option>
<option name="refresh.display">progressbar</option>
</table>
</panel>
</row>
</form>
4.Problem reproduction
There is a problem of data loss in cross year queries, such as querying 23-12-1 24-1-1, as shown in the following figure:
For example: 12/01/2023-06/30/2024
Creating dashboards and query statements will result in more results than temporary queries
Scenario 1: Query on 12/01/2023-06/30/2024
Result: There were 12 cases on the dashboard, 2 more than temporary queries
Scenario 2: Query Example: 12/01/2023-06/30/2024
187 dashboard cases, 1 more than temporary queries