Processing Logs
Log Processing Flowchart
graph LR
subgraph gl ["Graylog Server"]
direction LR
grayloginput("Graylog Input")
pipeline("Pipeline
Processing")
subgraph extractors ["Extractors"]
direction BT
regex("Regex/Grok Patterns")
lut("Lookup Tables")
da("Data Adapters")
end
logs("Processed Logs")
end
grayloginput --> extractors -- Stream --> pipeline
pipeline --> logs;
regex -. (when needed) .-> lut;
lut --> da;
Processing Stages
The order of message processing is configurable in System > Configurations > Message Processors.
In this example, the order of processing is:
- Message Filter Chain (Extractors)
- Extractors help parse log message data into useful fields
- Stream Rule Processor
- Messages can match multiple streams based on message source and other criteria
- Streams route messages to index sets or forward to outputs
- Different index sets may have different retention policies
- Pipeline Processor
- Pipelines consist of Stages, Rules and Functions
- Rules define which messages should be processed by a Stage, and what Functions should be applied
- Pipeline Functions can transform and manipulate message contents and parse message data into useful fields
I am using both extractors and pipeline rules. It would be possible, and maybe more computationally efficient, to do everything with pipeline rules, but a bit more difficult to configure (see also: "if it ain't broke").
Raw Message
<Event>
<Timestamp data_type="4">05/02/2024 11:23:13.729</Timestamp>
<Computer-Name data_type="1">dan-nps-1</Computer-Name>
<Event-Source data_type="1">IAS</Event-Source>
<Acct-Status-Type data_type="0">1</Acct-Status-Type>
<Acct-Authentic data_type="0">1</Acct-Authentic>
<User-Name data_type="1">john_doe@iu13.org</User-Name>
<Called-Station-Id data_type="1">AC-23-16-F2-15-11:IU13Employee</Called-Station-Id>
<NAS-Port-Type data_type="0">19</NAS-Port-Type>
<Service-Type data_type="0">2</Service-Type>
<Calling-Station-Id data_type="1">52-F6-D8-CB-01-2D</Calling-Station-Id>
<Connect-Info data_type="1">CONNECT 0Mbps 802.11b</Connect-Info>
<Acct-Session-Id data_type="1">26ECD9A769A00227</Acct-Session-Id>
<Acct-Multi-Session-Id data_type="1">A64281482EF1BDCD</Acct-Multi-Session-Id>
<Class data_type="1">311 1 10.10.10.10 12/29/2023 21:55:52 6903028</Class>
<Event-Timestamp data_type="4">05/02/2024 15:23:13</Event-Timestamp>
<Acct-Delay-Time data_type="0">0</Acct-Delay-Time>
<NAS-IP-Address data_type="3">10.13.41.33</NAS-IP-Address>
<Client-IP-Address data_type="3">10.13.41.33</Client-IP-Address>
<Client-Vendor data_type="0">0</Client-Vendor>
<Client-Friendly-Name data_type="1">IU13 APs</Client-Friendly-Name>
<Proxy-Policy-Name data_type="1">Staff Wireless</Proxy-Policy-Name>
<Packet-Type data_type="0">4</Packet-Type>
<Reason-Code data_type="0">0</Reason-Code>
</Event>
Note
The message is really on a single line -- here I added line breaks for readability.
Since the data is already structured, we simply need to extract the contents of each XML element to a field in Graylog.
Enriching the Data
Some fields such as Packet-Type
and Reason-Code
are enumerated types that are coded as integers:
<Packet-Type data_type="0">4</Packet-Type>
<Reason-Code data_type="0">0</Reason-Code>
What does a Packet-Type
of 4
indicate? We can transform these to human-readable values using lookup tables.
Packet-Type | Description |
---|---|
1 | Access-Request |
2 | Access-Accept |
3 | Access-Reject |
4 | Accounting-Request |
5 | Accounting-Response |
11 | Access-Challenge |
12 | Status-Server (experimental) |
13 | Status-Client (experimental) |
255 | Reserved |
I have created CSV files with mappings for seven enum fields. They are available at https://github.com/bakerds/NPS-Log-Parsing
Graylog can consume these CSV files using a Data Adapter, and make them available for use as a Lookup Table.
Extractors
- Navigate to System > Inputs
- Locate the input you created earlier and click Manage extractors
- Click Create extractor
- Click Load Message to load a recent message from the input. This message will be used to test your new extractor
- Next to the message field, click Select extractor Type
- Choose Regular expression
-
In this example, we will extract the
Packet-Type
field value<Packet-Type data_type="0">4</Packet-Type>
-
To extract the value
4
from this part of the message, we'll use this regular expression:<Packet-Type data_type="0">(\d+)</Packet-Type>
- Click the Try button to see if it works
- We only want the extractor to run if the
Packet-Type
tag is present in the message, so select Only attempt extraction if field contains string and enterPacket-Type
in the Field contains string textbox -
Enter
PacketType
in Store as fieldNote
The field name cannot contain a hyphen -- only alphanumeric characters and underscores are allowed
-
Enter
Packet-Type
in the Extractor title textbox - Next to Add converter, select "Lookup Table" and click the Add button
- Next to Lookup Table, select the "Packet-Type" lookup table
- Click Create Extractor
You can create extractors for every field that's present in the logs. Most can be handled with a simple regex string.
Here are the fields I chose to extract:
Field List
Acct-Multi-Session-Id
Acct-Session-Id
Authentication-Type
Called-Station-Id
Calling-Station-Id
Class
Client-Friendly-Name
Client-IP-Address
Client-Vendor
Computer-Name
Connect-Info
DataRate
IEEEStandard
RSSI
Channel
EAP-Friendly-Name
Event-Source
Framed-IP-Address
Framed-MTU
Fully-Qualifed-User-Name
NAS-IP-Address
NAS-IPv6-Address
NAS-Port-Type
NP-Policy-Name
Timestamp
Packet-Type
Provider-Type
Proxy-Policy-Name
Quarantine-Update-Non-Compliant
Reason-Code
SAM-Account-Name
Service-Type
Session-Timeout
User-Name
To save time, you can import the rest of the extractors that I have already created:
- Navigate to System > Inputs
- Locate the input you created earlier and click Manage extractors
- Click Actions > Import extractors
- Paste the contents of this file in the text area and click Add extractors to input
Tip
For some variety, check out the Connect String Parser extractor. That one uses a Grok pattern to parse multiple sub-fields from the Connect-Info
field.
Bug
Fully-Qualifed-User-Name
is misspelled -- by the NPS programmers! I have used their incorrect spelling in the extractor title and field name to keep it consistent with the source log files.
Pipeline Processing
We have a timestamping issue with our log messages. Graylog does not know how to read the timestamp field within an NPS log message, so it will record the time when it received the message, not the time the message was generated by NPS.
We can fix that by using a pipeline rule to parse the NPS timestamp within the log message, and replace the timestamp Graylog recorded when the message was received at the input.
NPS uses an ISO 8601 time format that looks like this: 04/05/2023 10:08:50.910
Heads up!
NPS uses the server's local time and does not indicate the timezone, so we must specify a timezone when we parse the timestamp.
If you have servers in multiple timezones, you will need to handle that somehow.
- Navigate to System > Pipelines and click the Manage rules tab
- Click Create Rule
- Click Use Source Code Editor
- Give the rule a helpful description
- In the Rule Source field, paste the following code:
rule "Convert NPS Timestamp" when has_field("nps_timestamp") then let nps_timestamp = $message.nps_timestamp; let new_timestamp = parse_date(to_string(nps_timestamp), "MM/dd/yyyy HH:mm:ss.SSS", "en-US", "America/New_York"); set_field("timestamp", new_timestamp); end
- Click Create Rule
- Navigate to System > Pipelines and click Add new pipeline
- Give the pipeline a name and click Create pipeline
- New pipelines have one stage called "Stage 0" with no rules
- Click Edit next to "Stage 0"
- Under Stage Rules, select the "Convert NPS Timestamp" rule you just created
- Click Update stage
- Back in the Pipeline details, click Edit connections
- Under Streams, select the "Default Stream"
- Click Update connections
Test
- Navigate to System > Inputs
- Locate the input you created earlier and click Show received messages
- With any luck, you should see some messages in the table -- with data parsed into useful, aggregable, and searchable fields! 🎉
- Configure
- Ingest
- Process
- Aggregate