All Posts

Find Answers
Ask questions. Get answers. Find technical product solutions from passionate members of the Splunk community.

All Posts

You need to look at how the input is processed and the definition of inputs.conf/props.conf for that linux sourcetype. As you can see in that event example, the time in the log message is 15:04:57, ... See more...
You need to look at how the input is processed and the definition of inputs.conf/props.conf for that linux sourcetype. As you can see in that event example, the time in the log message is 15:04:57, but the time in the Splunk event is 15:03:57, i.e. a minute earlier - so that date is not the one being used when Splunk is ingested. I am not familiar with SC4S, but has someone written a parser for that data - if so, it may be that the issue is at the SC4S parsing end.  
Actually, yes! I do have to add the fields pipe on the base search. I tried to add it on the chain also, but it does not work. And also, yes, I do have rename in the chain search while the spath i... See more...
Actually, yes! I do have to add the fields pipe on the base search. I tried to add it on the chain also, but it does not work. And also, yes, I do have rename in the chain search while the spath in the main search.  Interesting read on the forum you shared indeed. I'll be careful for now on parsing data between search. Ps. for name, in the end I have to mvzip name and status, mvjoin, find latest one then use rex to extract values out. It's complicated and time costly, but it works for now so I think I'm going to just let it be for now.
I assume that there is a typos in your MaxPeakTPS in the eventstats command and your use of peakTPS in the following stats and also the use of peakTime, which does not exist as a field. You can do t... See more...
I assume that there is a typos in your MaxPeakTPS in the eventstats command and your use of peakTPS in the following stats and also the use of peakTime, which does not exist as a field. You can do this | timechart span=1s count AS TPS ``` Calculate min and max TPS ``` | eventstats max(TPS) as max_TPS min(TPS) as min_TPS ``` Now work out average TPS, actual min and max TPS and then the first occurrence of the min/max TPS ``` | stats avg(TPS) as avgTPS values(*_TPS) as *_TPS min(eval(if(TPS=max_TPS, _time, null()))) as maxTime min(eval(if(TPS=min_TPS, _time, null()))) as minTime | fieldformat maxTime=strftime(maxTime,"%x %X") | fieldformat minTime=strftime(minTime,"%x %X") The min(eval... statements just look for the first _time when TPS is either min or max to get the earliest time when these occurred. Note the use of field naming conventions min_TPS/max_TPS that allows the use of wildcards in the stats.  
Every time I create a table visualization, I notice that the value 0 is always aligned on the left side while the rest is aligned on the right side. (322, 3483,0,0 is in the same column) Is ther... See more...
Every time I create a table visualization, I notice that the value 0 is always aligned on the left side while the rest is aligned on the right side. (322, 3483,0,0 is in the same column) Is there any reason behind it and any way to fix this? Thanks!  
You are still rounding UP with your floor(Score)+1 - is that what you intended?
You can give these evals a go. I would check and make sure you are getting everything properly as expected.  I don't have access to any sourcetype="mscs:nsg:flow" data at the moment so I just am u... See more...
You can give these evals a go. I would check and make sure you are getting everything properly as expected.  I don't have access to any sourcetype="mscs:nsg:flow" data at the moment so I just am using simulated data based off of your screenshots. If you are happy with the output then you could add them as calculated fields in local/props.conf (I would make sure that they don't step on any existing knowledge object in the app though) | eval time=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 0), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 0))), 'time') | eval src_ip=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 1), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 1))), 'src_ip') | eval dst_ip=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 2), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 2))), 'dst_ip') | eval src_port=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 3), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 3))), 'src_port') | eval dst_port=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 4), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 4))), 'dst_port') | eval protocol=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 5), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 5))), 'protocol') | eval traffic_flow=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 6), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 6))), 'traffic_flow') | eval traffic_result=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 7), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 7))), 'traffic_result')   Also, not sure if there are ever events formatted slightly differently because only a single flow occurred and it would no longer be an array in the json event, therefore changing the overall extracted field name to something like "records{}.properties.flows{}.flows.flowTuples{}". From the look at the microsoft_azure app configs, it looks like its only every referencing "records{}.properties.flows{}.flows{}.flowTuples{}" for it's extractions so I just made the assumption that events will be formatted this way.
Hi, We are ingesting Azure NSG flow logs and visualizing them using app Microsoft Azure App for Splunk https://splunkbase.splunk.com/app/4882 Data is in JSON format with multiple levels/records in ... See more...
Hi, We are ingesting Azure NSG flow logs and visualizing them using app Microsoft Azure App for Splunk https://splunkbase.splunk.com/app/4882 Data is in JSON format with multiple levels/records in a single event. Each record can have multiple flows, flow tuples etc. Adding few screenshots here to give the context. Default extractions for the main JSON fields look fine. But when it comes to values within the flow tuple field, i.e. records{}.properties.flows{}.flows{}.flowTuples{}, Splunk only keeps values from the very first entry. How can I make these src_ip, dest_ip fields also get multiple values(across all records/flow tuples etc)   Splunk extracts values only from that first highlighted entry Here is the extraction logic from this app.    [extract_tuple] SOURCE_KEY = records{}.properties.flows{}.flows{}.flowTuples{} DELIMS = "," FIELDS = time,src_ip,dst_ip,src_port,dst_port,protocol,traffic_flow,traffic_result     Thanks,
@VatsalJagani  I'm trying to work on setting up auto import for CSV file into a metric index. How to schedule this process? Any insights or examples would be greatly appreciated.
We use the free version of syslog-ng, and recently we had a requirement to have TLS on top of TCP, and we don't have the knowledge to implement it. Therefore we wonder whether anybody has migrated fr... See more...
We use the free version of syslog-ng, and recently we had a requirement to have TLS on top of TCP, and we don't have the knowledge to implement it. Therefore we wonder whether anybody has migrated from syslog-ng to SC4S to assist with more advanced requirements such as TCP/TLS? https://splunkbase.splunk.com/app/4740 If so, what lessons learned and motivations to do that?
One hint for home directories. I never create home directory as /opt/splunk or similar for any service user unless that’s mandatory by service/program. It’s much easier to manage that service when yo... See more...
One hint for home directories. I never create home directory as /opt/splunk or similar for any service user unless that’s mandatory by service/program. It’s much easier to manage that service when you could keep your own stuff outside of distribution directories. Also it’s easy to take temporary conf backups etc under home. 
Where is the data from the Splunk Enterprise Security (ES) Investigation Panel stored? In the previous version, it seemed to be stored in a KV lookup, but I can't find it in the current 7.x version.... See more...
Where is the data from the Splunk Enterprise Security (ES) Investigation Panel stored? In the previous version, it seemed to be stored in a KV lookup, but I can't find it in the current 7.x version. I understand that the Notable index holds information related to incidents from the Incident Review Dashboard. How can we map Splunk Notables and their Investigations together to generate a comprehensive report in the current 7.x ES version?
I'm also glad I found this thread, only wish it was sooner.  I just ran into this today when I saw a forwarder on RHEL 7 update from splunkforwarder-9.0.2-17e00c557dc1.x86_64 to splunkforwarder-9.1.2... See more...
I'm also glad I found this thread, only wish it was sooner.  I just ran into this today when I saw a forwarder on RHEL 7 update from splunkforwarder-9.0.2-17e00c557dc1.x86_64 to splunkforwarder-9.1.2-b6b9c8185839.x86_64. In my environment and with our Ansible automation we pre-create the splunk user and group so that we maintain the same userid and groupid across all system and don't conflict with users in our LDAP directory.  It has always been a problem that both the forwarder and enterprise used the same name since they expect different home directories (i.e. /opt/splunk vs. /opt/splunkforwarder) which means trying to centralize a splunk user doesn't work well.  I like the idea of having separate users/groups but this was a surprise change to me and not sure what I am left with at the moment other than a currently broken install while I figure out what the implication are of the 1400+ messages for  "warning: user splunkfwd does not exist - using root" and "warning: group splunkfwd does not exist - using root" are.  Presumably I can just add the splunkfwd user and group and then change ownership and run some invocation of splunk enable boot-start -systemd-managed 1 -user splunkfwd -group splunkfwd to setup the systemd unit file but not something I had planned on doing.  Also not sure if I need to now change the user defined in /opt/splunkforwarder/etc/splunk-launch.conf.
Tried this out and came back with this. Format might be a little different than what you asked for but I think tells the same story. | bucket span=1m _time | stats count as TPS ... See more...
Tried this out and came back with this. Format might be a little different than what you asked for but I think tells the same story. | bucket span=1m _time | stats count as TPS by _time | eventstats min(TPS) as min_TPS, max(TPS) as max_TPS | foreach *_TPS [ | eval <<MATCHSTR>>_TPS_epoch=if( 'TPS'=='<<MATCHSTR>>_TPS', mvappend( '<<MATCHSTR>>_TPS_epoch', '_time' ), '<<MATCHSTR>>_TPS_epoch' ) ] | stats avg(TPS) as avg_TPS, first(*_TPS) as *_TPS, first(*_TPS_epoch) as *_TPS_epoch | eval avg_TPS=round('avg_TPS', 2) | foreach *_TPS_epoch [ | eval <<MATCHSTR>>_TPS_timestamps=case( mvcount('<<FIELD>>')==1, strftime('<<FIELD>>', "%x %X"), mvcount('<<FIELD>>')>1, mvmap('<<FIELD>>', strftime('<<FIELD>>', "%x %X")) ), <<MATCHSTR>>_TPS_json=json_object( "type", "<<MATCHSTR>>", "TPS", '<<MATCHSTR>>_TPS', "Timestamps", '<<MATCHSTR>>_TPS_timestamps' ), combined_TPS_json=mvappend( 'combined_TPS_json', '<<MATCHSTR>>_TPS_json' ) ] | fields + combined_TPS_json, avg_TPS | addinfo | eval search_time_window_end=strftime(info_max_time, "%x %X"), search_time_window_start=strftime(info_min_time, "%x %X"), avg_TPS_time_window='search_time_window_start'." --> ".'search_time_window_end' | eval combined_TPS_json=mvappend( 'combined_TPS_json', json_object( "type", "avg", "TPS", 'avg_TPS', "Timestamps", 'avg_TPS_time_window' ) ) | mvexpand combined_TPS_json | fromjson combined_TPS_json | fields - combined_TPS_json | fields + type, TPS, Timestamps   Output should look something like this.  You should also be able to change the time bucket span form 1m back to 1s since that is how it was setup in your initial query.
@bowesmana @yuanliu  Thank you for your help. I understand now if I want to use Student="Total", I just use the following | addcoltotals labelfield=Student label=Total | eval Score = if(S... See more...
@bowesmana @yuanliu  Thank you for your help. I understand now if I want to use Student="Total", I just use the following | addcoltotals labelfield=Student label=Total | eval Score = if(Student="Total", floor(Score) + 1, Score) if I leave the label blank, I can use | eval Score = if(isnull(Student), floor(Score) + 1, Score)
That is correct, but an indexer cluster would not solve that problem. I suggest having two HFs in an active/warm standby configuration.  There would need to be some means to copy state information f... See more...
That is correct, but an indexer cluster would not solve that problem. I suggest having two HFs in an active/warm standby configuration.  There would need to be some means to copy state information from the active HF to the standby.  That would be a non-Splunk solution.
Hi Splunkers, I have a strange situation about a some universal forwarders. On some Windows host, a colleague has installed the UF using the graphical wizards. Those forwarders must be managed with... See more...
Hi Splunkers, I have a strange situation about a some universal forwarders. On some Windows host, a colleague has installed the UF using the graphical wizards. Those forwarders must be managed with a Deployment server. He has NOT used the "customize" options; so, he has not set which logs must be sent to HF (Application, Security and so on) and a destination HF/Indexers. He has only inserted: Admin username and password Deployment server IP address and port As wrote above, he didn't inserted HF and/or Indexers; the idea is that once the UF has spoken with the Deployment server, 2 apps that contains inputs.conf and outputs.conf are downloaded and, after that, logs are sent. On Deployment server (we checked), the apps that should to be downloaded form UF have been created and contains the above 2 files. So, why I wrote "the apps that should be downloaded?" Well, due logs are not collected and sent to HF, we performed some troubleshoot and we found that apps has not been downloaded.  I mean: on host where UF is installed, if we go on $SplunkUFHOME$\etc\apps, the 2 apps are not present. So, that means that no custom inputs.conf and outputs.conf are present on UF. Only the default provided with installation are present. First thing we thought: ok, we have network issues. But it seems not: we are perfectly able, from host with UF, to ping and telnet deployment server on its port. At same time, we can access firewall that manage this traffic and we don't see, on firewall logs, any evidence of blocked/truncated connections. UF can reach DS and vice versa without issues. We tried so to manually copy folders with apps inside UF (I know, very bad things, don't blame me please...) but the situation is always the same. So, the question is: if no network issues are present, what can be the root cause about no downloaded apps?  
You should be able to utilize built in Splunk JSON commands and/or JSON functions to build out valid STIX 2.1 objects from Splunk events. Here is some sample SPL to give you an example of how to b... See more...
You should be able to utilize built in Splunk JSON commands and/or JSON functions to build out valid STIX 2.1 objects from Splunk events. Here is some sample SPL to give you an example of how to build out the json from individual fields in Splunk. | makeresults | fields - _time ``` gen properties data ``` | eval enum=split("attack-pattern|campaign", "|"), description="The type of this object, which MUST be the literal `attack-pattern`.", type="string" | tojson str(enum) str(type) str(description) output_field=properties | fields - enum, type, description ``` gen ID data ``` | eval title="id", pattern="^attack-pattern--" | tojson str(title) str(pattern) output_field=id | fields - title, pattern ``` gen Name data ``` | eval type="string", description="The name used to identify the Attack Pattern." | tojson str(type) str(description) output_field=name | fields - type, description ``` gen description data ``` | eval type="string", description="A description that provides more details and context about the Attack Pattern, potentially including its purpose and its key characteristics." | tojson str(type) str(description) output_field=description | fields - type, description ``` gen kill_chain_phases data ``` | eval "$ref"="../common/kill-chain-phase.json" | tojson str($ref) output_field=items | fields - "$ref" | eval type="array", description="The list of kill chain phases for which this attack pattern is used.", minItems=1 | tojson str(type) str(description) json(items) num(minItems) output_field=kill_chain_phases | fields - minItems, items, description, type | tojson json(properties) json(type) json(id) json(name) json(description) json(kill_chain_phases) output_field=allOf | fields - properties, type, id, name, description, kill_chain_phases | eval ref="../common/core.json" | tojson str(ref) output_field=allOf_2 | eval allOf=mvappend( 'allOf_2', 'allOf' ) | fields - allOf_2 | eval required="name", type="object", description="Attack Patterns are a type of TTP that describe ways that adversaries attempt to compromise targets. ", title="attack-pattern", "$schema"="http://json-schema.org/draft-04/schema#" | tojson str($schema) str(title) str(description) str(type) json(allOf) str(required) output_field=stix_2_payload | fields + stix_2_payload   For this example I used a generative command to put together sample data first, but if you are building from a Splunk event then the fields should all be derived from _raw or already extracted. The example is more of a demonstration on how to build a valid STIX 2.1 json object using Splunk. Below is the json object build out from the SPL above. { "$schema": "http://json-schema.org/draft-04/schema#", "allOf": [ { "ref": "../common/core.json" }, { "id": { "pattern": "^attack-pattern--", "title": "id" }, "kill_chain_phases": { "description": "The list of kill chain phases for which this attack pattern is used.", "items": { "$ref": "../common/kill-chain-phase.json" }, "minItems": 1, "type": "array" }, "name": { "description": "The name used to identify the Attack Pattern.", "type": "string" }, "properties": { "description": "The type of this object, which MUST be the literal `attack-pattern`.", "enum": [ "attack-pattern", "campaign" ], "type": "string" } } ], "description": "Attack Patterns are a type of TTP that describe ways that adversaries attempt to compromise targets. ", "required": "name", "title": "attack-pattern", "type": "object" }
Hi richgallowy, thanks for your answer and clarifications. In case i install IA on one HF and that HF goes down i'm not collecting anymore SentinalOne logs. am i right?   
The only inputs that should be enabled on an indexer are those that query the local server.  Otherwise, data duplication may results. Per the SentinelOne installation instructions, the inputs app sh... See more...
The only inputs that should be enabled on an indexer are those that query the local server.  Otherwise, data duplication may results. Per the SentinelOne installation instructions, the inputs app should be installed on a heavy forwarder.  No matter what the indexer configuration is, the IA goes on a HF.  Only the TA should be installed on the indexers. FTR, it is not necessary to use an index cluster to have high availability on ingest.  HA on ingest is provided by having forwarders distribute data across more than one indexer.  Indexer clusters protect the data by having multiple copies of it.  The extra copies offer HA at search time.