In incident response, time is precious and something you can never get back. Typically, when I receive a security alert about an endpoint, it requires manual lookups on multiple data sources for critical pieces of information. These manual lookups can be time-consuming, create fatigue, and don’t use the power of technology to your advantage. This blog post will demonstrate a proof-of-concept (POC) by using the power of a network community ID hash by Corelight to fuse endpoint and network-based data sources.
KSQL by Confluent provides the ability to enrich independent data sources by correlating common attributes. In this POC, we are going to use Sysmon or Osquery to monitor the endpoint and Zeek to monitor the network. Not only will this blog post serve as a POC but it will discuss the architecture, design decisions, working infrastructure-as-code, and the knowledge I accumulated from this project. The hope is that this POC will serve as a framework for the infosec community to use to perform log enrichment. Lastly, I will demonstrate the power of this POC by detecting a Powershell Empire agent that has been injected into explorer.exe
.
DISCLAIMER
The information contained in this blog post is for educational purposes ONLY! HoldMyBeerSecurity.com/HoldMyBeer.xyz and its authors DO NOT hold any responsibility for any misuse or damage of the information provided in blog posts, discussions, activities, or exercises.
DISCLAIMER
Problem statement
First, take a look at the screenshot on the left and attempt to determine if it’s a malicious connection. Now, look at a similar log event on the right that has been enriched and tell me if it’s malicious. The difference between the two log events is minimal but that process information makes a HUGE impact on your incident response capabilities and the time it takes to triage an alert. If you are not familiar with Windows systems, the process explorer.exe
SHOULD NEVER be making a network connection, especially to a DigitalOcean server in Bangalore, India.
By correlating endpoint-based events and network-based events, we can get the full picture of activity occurring on a machine. The left screenshot is a standard Zeek network connection event and the right screenshot is a Zeek network connection enriched with process information data. This SMALL BUT crucial piece of information drastically changes the game and the way we develop analytics on our data and our decision making.
Assumptions
This blog post is written to be a proof of concept and not a comprehensive post. This post will NOT cover how Osquery, Kafka, Sysmon, KSQL, Spunk, or Docker works, therefore this post assumes you know how these technologies work. Second, this blog post contains setups and configurations that may NOT be production-ready. The “Lessons learned” section discusses various improvements for this implementation.
Assumptions
Background
What is a communityID?
CommunityID is a new feature being implemented by networking security applications such as Zeek and Suricata. A CommunityID is a hash of the tuple (destination IP address, source IP address, destination port, source port, protocol) and this tuple defines a unique connection. For example, let’s say Suricata detects malicious activity and when you examine the details of the alert it includes a unique hash as the value of communityID. Next, we can copy this unique communityID hash and search for it in the Zeek logs to review all the logs related to this connection.
What is Splunk’s Common Information Model (CIM)?
Splunk’s Common Information Model (CIM) is a model for Splunk administrators to follow (but it is not enforced) so all data sets have the same structure. For example, Osquery uses local_address
for the source IP address but Zeek uses id.orig_h
which at first glance is not a friendly convention. The Splunk Network Traffic CIM states this field name should be src_ip
for all network logging sources. This makes a huge impact because instead of having to remember the naming convention for each logging source you just have to know the logging convention for a source type. Furthermore, the cross-correlation between Kafka topics with KSQL becomes much easier if the indexes have the same fields, as you will see below.
The architecture of the pipeline
A majority of this architecture is built on previous work I have done in a recent blog post called My Logging Pipeline: Logstash, Kafka, Splunk. Throughout this blog post, I will reference that post for brevity. If you would like a more thorough understanding of each component in this pipeline, please check out that blog post.
Network diagram
Tool and services overview
- Osquery – exposes an operating system as a high-performance relational database. This allows you to write SQL-based queries to explore operating system data. With Osquery, SQL tables represent abstract concepts such as running processes, loaded kernel modules, open network connections, browser plugins, hardware events or file hashes.
- Sysmon – System Monitor (Sysmon) is a Windows system service and device driver that, once installed on a system, remains resident across system reboots to monitor and log system activity to the Windows event log. It provides detailed information about process creations, network connections, and changes to file creation time. By collecting the events it generates using Windows Event Collection or SIEM agents and subsequently analyzing them, you can identify malicious or anomalous activity and understand how intruders and malware operate on your network
- Zeek(formerly known as Bro) – is a passive, open-source network traffic analyzer. It is primarily a security monitor that inspects all traffic on a link in depth for signs of suspicious activity. More generally, however, Bro supports a wide range of traffic analysis tasks even outside of the security domain, including performance measurements and helping with troubleshooting.
- Logstash – is an open source data collection engine with real-time pipelining capabilities. Logstash can dynamically unify data from disparate sources and normalize the data into destinations of your choice. Cleanse and democratize all your data for diverse advanced downstream analytics and visualization use cases.
- Kafka – is a community distributed event streaming platform capable of handling trillions of events a day. Initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Since being created and open sourced by LinkedIn in 2011, Kafka has quickly evolved from messaging queue to a full-fledged event streaming platform.
- KSQL – is the streaming SQL engine for Apache Kafka. It provides an easy-to-use yet powerful interactive SQL interface for stream processing on Kafka without the need to write code in a programming language such as Java or Python. KSQL is scalable, elastic, fault-tolerant, and real-time. It supports a wide range of streaming operations, including data filtering, transformations, aggregations, joins, windowing, and sessionization.
- Splunk – is an advanced, scalable, and effective technology that indexes and searches log files stored in a system. It analyzes the machine-generated data to provide operational intelligence. The main advantage of using Splunk is that it does not need any database to store its data, as it extensively makes use of its indexes to store the data. Splunk is a software mainly used for searching, monitoring, and examining machine-generated Big Data through a web-style interface
The flow of data explained
This section will explain the data pipeline in detail starting with how data is generated on the endpoint, how logs are shipped from the endpoint to Logstash, how logs are normalized by Logstash, how logs are correlated and enriched by KSQL, and how Kafka Connect sends the enriched logs to Splunk. First, we have a Windows 10 endpoint that is running Sysmon + Winlogbeat or Osquery + Polylogtx + Filebeat. Later in this blog post, I discuss the pros and cons of each setup but for now, all you need to know is these setups monitor which processes create network connections on the endpoint.
The Osquery option implements the Polylogyx extension pack to monitor the endpoint, specifically for network connections. On a periodic interval (default 30 seconds), Osquery will request all the recorded network connections from the Polyllogyx extension to generate a network communityID for each connection utilizing the Osquery community_id_v1
function. Next, Filebeat reads the C:\Program Files\osquery\osquery.results.log
log for new entries and ships those entries off to Logstash for ingestion using the Beats protocol.
The Sysmon option utilizes Winlogbeat for network CommunityID generation. The Filebeat and Winlogbeat logging clients have a capability called “processors” which provides the ability to “enhance data before sending it to the configured output”. One of these builtin processors is the Community ID Network Flow Hash which can read network connection logs generated by Sysmon and enrich the log with a Community ID based on values within that log entry.
For this POC we utilized the SwiftOnSeccurity Sysmon config to monitor the Windows endpoint. The activity monitored by Sysmon is recorded to the Windows Event Log located at Applications and Services Logs/Microsoft/Windows/Sysmon/Operational
. Winlogbeat is configured to read the Windows Event Log for new Sysmon entries, enrich the log entry with CommunityID, and ship the logs off to Logstash for ingestion using the Beats protocol.
Roughly at the same time, Zeek is monitoring the network where the Windows endpoint(s) reside recording all the network connections. Zeek has been configured with the Zeek Package Manager (zkg) to log all Zeek events in JSON format and to append the network CommunityID to each event. By default Zeek logs all connections to /opt/zeek/logs/current/json_streaming_*.log
and Filebeat monitors this directory for new events. When Filebeat detects a new log entry it ships the event to Logstash for ingestion using the Beats protocol.
The Logstash component of this pipeline deserves its own section because a lot of important steps occur. Logstash is typically configured with the following phases: inputs, filters, and outputs. Logstash is configured to ingest logs using the Beats protocol, which is the protocol used by Filebeat and Winlogbeat. Next, there are a series of Logstash filters that are applied to individual log sources (Osquery, Sysmon, and Zeek). The Osquery filter inspects all logs containing the osquery
tag. If a log matches this tag, the message payload is extracted to the root of the JSON structure and the key names of Osquery are normalized using the Splunk Network Traffic CIM, such as converting remote_address
to dest_ip
, converting the epoch timestamp to ISO8601, and lastly removing metadata generated by Filebeat.
The Sysmon filter inspects all logs containing the sysmon
tag. If a log matches this tag, the key name event.code
is renamed to event_code
. Next, a new field named event_name
is created which is populated based on a static dictionary that maps Sysmon event codes to the human-friendly name, such as event_code: 3
is event_name: NetworkConnectInitiated
. Next, for this POC a simple filter was created to target Sysmon event_code: 3
and when a log event matches that code the log is normalized using the Splunk Network Traffic CIM. Lastly, the filter removes the metadata generated by Winlogbeat.
The Zeek filter inspects all logs containing the zeek
tag. If a log matches this tag, the message payload is extracted to the root of the JSON structure, any Zeek logs that don’t contain the field _path
are dropped, key names of Zeek are normalized (id.orig_h
-> src_ip
, id.resp_h
-> dest_ip
) using the Splunk Network Traffic CIM, and lastly the metadata generated by Filebeat is removed.
The final phase of Logstash is the output config. The output section only executes on logs that contain the tags discussed above in the filters. The output phase is configured to send all logs with the zeek
tag to Kafka and more specifically each Zeek log type is sent to its own topic via topic_id => "zeek-%{[_path]}"
. This line configuration will extract _path
(Zeek log type: DNS, conn, x509, SSL, etc) and send it to that topic. In addition to sending all Zeek logs to Kafka, Logstash ensures delivery by instructing Kafka to send back an ACK if it received the message, like TCP. The Osquery and Sysmon outputs follow a similar flow and I won’t repeat myself for brevity.
Next, KSQL is configured to ingest logs from Kafka topics created by Logstash to correlate and enrich logs from each data source. First, each Kafka topic for a data source has an associated KSQL stream created. Each stream is then used to create another stream but this stream is created with a key (rowkey) which is the value of the communityID. This rowkey (CommunityID) is used to populate each entry with a unique value that is used for correlation.
For Sysmon the Kafka topic sysmon_stream_keyed
(which contains Sysmon events about network connections) and zeek_conn_stream_keyed
(which contains Zeek network events) are used to create zeek_sysmon_stream
. This Kafka stream is the fusion of Zeek network events that have been enriched by process information from Sysmon logs that were correlated using the rowkey which is the network CommunityID. The Osquery setup uses the same approach but with the following Kafka topics osquery_win_socket_events_stream_keyed
and zeek_conn_stream_keyed
.
The last phase of this pipeline is Kafka Connect reading the enriched logs from zeek_osquery_stream
or zeek_sysmon_stream
and sending them to Splunk to be ingested. Furthermore, Kafka Connect can be configured to send the Kafka topics zeek-conn
, osquery_win_socket_events
, and sysmon-NetworkConnectInitiated
(which contains the original data sources) to Splunk.
Alternative implementations?
Zeek-osquery and Bro-Sysmon are open-source alternatives to my implementation. Both of these alternatives can utilize Zeek/BRO on your local network to ingest and correlate logs between network and endpoint events. However, these alternative solutions have some differences which I will discuss independently for each platform. The goal of this section is to discuss alternative solutions and to compare each one.
- Bro-Sysmon
- A project that hasn’t been updated in two years – All configurations are written for BRO v2.X and are not compatible with Zeek v3.X out of the box
- Requires a Python middle man between the log ingestor (Logstash) and Zeek
- Provides the capability to utilize JA3 hashes.
- Zeek-Osquery
- The query scheduler is Zeek, which means you can’t use something like Kolide or Upytcs
- Osquery needs to be compiled from source to add functionality
- Built and tested with an old version of Osquery 3.3.0 and the current version at the time of this writing is 4.3.0
- Only supports an older version of Bro v2.6.4
- Bro needs to be compiled from source to include functionality
Choosing your own adventure
Osquery vs. Sysmon
Both platforms have their pros and cons which means you need to pick the best platform for YOUR needs. Below I have covered the differences between the two platforms with respect to this blog post.
- Sysmon
- Pros
- SwiftOnSecurity’s config which is a recognized and actively maintained configuration
- Real-time monitoring
- Cons
- Log events are written in XML to Windows Event logger
- Requires a driver
- Limited to monitor 22 event types
- Pros
- Osquery
- Pros
- Actively maintained project by Facebook, Kolide, and Uptycs
- Vibrant and supportive Slack community for questions and issues
- Provides the ability to be connected to a query scheduler like Uptycs or Kolide
- Provides unlimited possibilities to extract information due to the 200+ tables
- Provides the ability for further investigation and evidence collection with Upytcs and Kolide
- If connected to a task scheduler no need for Filebeat to ship logs
- Actively maintained project by Facebook, Kolide, and Uptycs
- Cons
- Requires a driver with Polylogyx extensions
- Events buffer that when full will delete old events on a busy system
- Close to real-time monitoring because the query runs on an interval
- Polylogyx extension bundle can consume a lot of memory and CPU depending on the query
- Pros
Install/Setup Sysmon + Filebeat on Windows 10
Install Sysmon
- Open Powershell as Administrator
cd $ENV:TMP
Invoke-WebRequest -Uri https://download.sysinternals.com/files/Sysmon.zip -OutFile Sysmon.zip
Expand-Archive .\Sysmon.zip -DestinationPath .
- Unzipping Sysmon
Invoke-WebRequest -Uri https://raw.githubusercontent.com/SwiftOnSecurity/sysmon-config/master/sysmonconfig-export.xml -OutFile sysmonconfig-export.xml
.\Sysmon.exe -accepteula -i .\sysmonconfig-export.xml
- Install Sysmon and load config
Install Winlogbeat
cd $ENV:TMP
Invoke-WebRequest -Uri https://artifacts.elastic.co/downloads/beats/winlogbeat/winlogbeat-7.7.0-windows-x86_64.zip -OutFile winlogbeat-7.7.0-windows-x86_64.zip
Expand-Archive .\winlogbeat-7.7.0-windows-x86_64.zip -DestinationPath .
- Unzipping Winlogbeat
mv .\winlogbeat-7.7.0-windows-x86_64 'C:\Program Files\winlogbeat'
cd 'C:\Program Files\winlogbeat\'
Invoke-WebRequest -Uri https://raw.githubusercontent.com/CptOfEvilMinions/BlogProjects/master/sysmon-winlogbeat-communityid/conf/winlogbeat/winlogbeat.yml -OutFile winlogbeat.yml
- Using your favorite text editor open
'C:\Program Files\winlogbeat\winlogbeat.yml'
- Open the document from the command line with Visual Studio Code:
code .\winlogbeat.yml
- Open the document from the command line with Notepad:
notepad.exe.\winlogbeat.yml
- Open the document from the command line with Visual Studio Code:
- Scroll down to the
output.logstash:
- Replace
logstash_ip_addr
with the IP address of FQDN of Logstash - Replace
logstash_port
with the port Logstash uses to ingest Beats (default 5044)
- Replace
powershell -Exec bypass -File .\install-service-winlogbeat.ps1
- Install Winlogbeat
Set-Service -Name "winlogbeat" -StartupType automatic
Start-Service -Name "winlogbeat"
Get-Service -Name "winlogbeat"
Install/Setup Osquery + Filebeat on Windows 10
Install/Setup Osquery on Windows 10
- Open Powershell as Administrator
Invoke-WebRequest -Uri https://pkg.osquery.io/windows/osquery-4.2.0.msi -OutFile osquery-4.2.0.msi -MaximumRedirection 3
- Download Osquery
Start-Process $ENV:TEMP\osquery-4.2.0.msi -ArgumentList '/quiet' -Wait
- Install Osquery
Invoke-WebRequest -Uri https://raw.githubusercontent.com/CptOfEvilMinions/KSQL-Sysmon-Osquery-Zeek/master/conf/osquery/extensions.load -OutFile 'C:\Program Files\osquery\extensions.load'
Invoke-WebRequest -Uri https://raw.githubusercontent.com/CptOfEvilMinions/KSQL-Sysmon-Osquery-Zeek/master/conf/osquery/osquery.conf -OutFile 'C:\Program Files\osquery\osquery.conf'
Invoke-WebRequest -Uri https://raw.githubusercontent.com/CptOfEvilMinions/KSQL-Sysmon-Osquery-Zeek/master/conf/osquery/osquery.flags -OutFile 'C:\Program Files\osquery\osquery.flags'
Install/Setup Polylogyx-osquery extensions
cd $ENV:TEMP
Invoke-WebRequest -Uri https://github.com/polylogyx/osq-ext-bin/archive/v1.0.40.1.zip -OutFile osq-ext-bin.zip -MaximumRedirection 3
Expand-Archive .\osq-ext-bin.zip -DestinationPath .
cd osq-ext-bin-*
(Get-Content -Path .\plgxupgrade.ps1 -Raw) -replace "Env:Programdata","Env:ProgramFiles" | Set-Content -Path .\plgxupgrade.ps1
powershell -Exec bypass -File .\plgxupgrade.ps1
- Ignore errors that the “vast” and “vastnw” service can’t be found
New-Item -Path "C:\Program Files\osquery" -Name "Extensions" -ItemType "directory"
- Create Extensions directory
Copy-Item C:\Program Files\osquery\plgx_win_extension.ext.exe -Destination 'C:\Program Files\osquery\Extensions\plgx_win_extension.ext.exe'
- Copy extension to directory
cd 'C:\Program Files\osquery'
icacls .\Extensions /setowner Administrators /t
icacls .\Extensions /grant Administrators:f /t
icacls .\Extensions /inheritance:r /t
icacls .\Extensions /inheritance:d /t
Test Osquery-polylogyx and win_socket_events table
Stop-Service -Name osqueryd
.\osqueryi.exe --flagfile .\osquery.flags
.tables
SELECT * FROM win_socket_events;
SELECT community_id_v1(local_address,remote_address,local_port,remote_port,protocol) as community_id, * FROM win_socket_events WHERE action='SOCKET_CONNECT';
.quit
Start-service osqueryd
Get-Service osqueryd
Install/Setup Filebeat agent for Windows
cd $ENV:TEMP
Invoke-WebRequest -Uri https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.7.0-windows-x86_64.zip -OutFile filebeat-7.7.0-windows-x86_64.zip -MaximumRedirection 3
- Download Filebeat
Expand-Archive .\filebeat-7.7.0-windows-x86_64.zip -DestinationPath .
- Unzip Filebeat
Copy-Item ".\filebeat-7.7.0-windows-x86_64\" -Destination 'C:\Program Files\Filebeat\' -Recurse
- Copy Filebeat directory to
C:\Program Files
- Copy Filebeat directory to
cd 'C:\Program Files\Filebeat'
Invoke-WebRequest -Uri https://raw.githubusercontent.com/CptOfEvilMinions/BlogProjects/master/ksql-osquery-zeek-correlator/conf/windows-filebeat/filebeat.yml -OutFile 'C:\Program Files\Filebeat\filebeat.yml'
- Using your favorite text editor open
'C:\Program Files\winlogbeat\winlogbeat.yml'
- Open the document from the command line with Visual Studio Code:
code .\winlogbeat.yml
- Open the document from the command line with Notepad:
notepad.exe.\winlogbeat.yml
- Open the document from the command line with Visual Studio Code:
- Scroll down to the
output.logstash:
- Replace
logstash_ip_addr
with the IP address of FQDN of Logstash - Replace
logstash_port
with the port Logstash uses to ingest Beats (default 5044)
- Replace
powershell -Exec bypass -File .\install-service-filebeat.ps1
Start-Service -Name filebeat
Get-Service -Name filebeat
Install/Setup Zeek 3.X + Filebeat on Ubuntu 20.04
Install Zeek from a package
sudo sh -c "echo 'deb http://download.opensuse.org/repositories/security:/zeek/xUbuntu_20.04/ /' > /etc/apt/sources.list.d/security:zeek.list"
wget -nv https://download.opensuse.org/repositories/security:zeek/xUbuntu_20.04/Release.key -O Release.key
sudo apt-key add - < Release.key
sudo apt-get update -y
sudo apt-get install zeek -y
/opt/zeek/bin/zeek -v
Setup Zeek
node.cfg
- Run the command
ip a
- Determine the interface you want Zeek to monitor
sudo sed -i 's/interface=eth0/interface=<interface you want Zeek to monitor>/g' /opt/zeek/etc/node.cfg
cat /opt/zeek/etc/node.cfg
networks.cfg
- Determine the networks Zeek will be monitoring
sudo vim /opt/zeek/etc/networks.cfg
- Add the network CIDRs
Install/Setup Zeek Package Manager (zkg)
sudo su
apt-get install python3 python3-pip -y
pip3 install zkg
export PATH=/opt/zeek/bin:$PATH
- Add Zeek binaries to your $PATH
zkg autoconfig
printf "\n\n# Load ZKG packages\n@load packages\n" | sudo tee -a /opt/zeek/share/zeek/site/local.zeek
Install JSON and CommunityID package
zkg install json-streaming-logs
sed -i 's#const JSONStreaming::disable_default_logs = F#const JSONStreaming::disable_default_logs = T#g' /opt/zeek/share/zeek/site/json-streaming-logs/main.bro
- Disable traditional TSV logging
apt-get install cmake -y
zkg install zeek-community-id
/opt/zeek/bin/zeekctl check
sed -i 's/bro_init()/zeek_init()/g' /opt/zeek/share/zeek/site/packages/./json-streaming-logs/./main.bro
sed -i 's/bro_init()/zeek_init()/g' /opt/zeek/share/zeek/site/packages/./json-streaming-logs/./main.bro
Start Zeek
/opt/zeek/bin/zeekctl deploy
/opt/zeek/bin/zeekctl status
Test Zeek setup
curl https://www.google.com
tail -f /opt/zeek/logs/current/json_streaming_conn.log
Install/Setup Filebeat for Zeek
Install Filebeat
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | apt-key add -
apt-get install apt-transport-https -y
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | tee -a /etc/apt/sources.list.d/elastic-7.x.list
apt-get update -y && apt-get install filebeat -y
systemctl enable filebeat
Setup Filebeat
mkdir /etc/filebeat/conf.d
curl https://raw.githubusercontent.com/CptOfEvilMinions/KSQL-Sysmon-Osquery-Zeek/master/conf/filebeat/zeek/filebeat.yml -o /etc/filebeat/filebeat.yml
curl https://raw.githubusercontent.com/CptOfEvilMinions/KSQL-Sysmon-Osquery-Zeek/master/conf/filebeat/zeek/zeek.yml -o /etc/filebeat/conf.d/zeek.yml
sed -i 's/logstash_ip_addr/<Logstash IP addr or FQDN>/g' /etc/filebeat/filebeat.yml
sed -i 's/logstash_port/<Logstash port>/g' /etc/filebeat/filebeat.yml
sudo systemctl restart filebeat
Deploy logging stack with Docker
Spin up Docker stack
docker-compose build
docker-compose up -d
docker stats
Setup KSQL with KSQL client
Typically, I dislike when people run commands after docker-compose inside containers but this is to demonstrate how this works behind the scenes.
docker ps
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
SHOW topics;
Test zeek-conn topic
- Run this on Zeek
curl http://google.com
PRINT 'zeek-conn';
Test Osquery topic
- Run this on the Windows endpoint
Invoke-WebRequest -Uri https://google.com
PRINT 'osquery-win_socket_events';
Test Sysmon topic
- Open Powershell
Invoke-WebRequest -Uri https://google.com
PRINT 'sysmon-NetworkConnectInitiated';
Create Zeek conn KSQL stream
-
CREATE STREAM zeek_conn_stream (creation_time VARCHAR, uid VARCHAR, conn_type VARCHAR, src_ip VARCHAR, src_port INTEGER, dest_ip VARCHAR, dest_port INTEGER, protocol VARCHAR, conn_state VARCHAR, local_orig VARCHAR, local_resp VARCHAR, missed_bytes BIGINT, history VARCHAR, orig_pkts BIGINT, orig_ip_bytes BIGINT, resp_pkts BIGINT, resp_ip_bytes BIGINT, community_id VARCHAR ) WITH ( kafka_topic='zeek-conn', value_format='JSON', timestamp='creation_time', timestamp_format='yyyy-MM-dd''T''HH:mm:ss.SSSSSS''Z''');
DESCRIBE zeek_conn_stream;
CREATE STREAM zeek_conn_stream_keyed AS SELECT * FROM zeek_conn_stream PARTITION BY COMMUNITY_ID;
SELECT * FROM zeek_conn_stream_keyed EMIT CHANGES;
Create Sysmon network KSQL stream
CREATE STREAM sysmon_network_stream ( creation_time VARCHAR, event_code INTEGER, community_id VARCHAR, src_ip VARCHAR, src_port INTEGER, dest_ip VARCHAR, dest_port INTEGER, protocol VARCHAR, direction VARCHAR, hostname VARCHAR, process_id INTEGER, process_name VARCHAR, process_exec VARCHAR, OS VARCHAR) WITH ( kafka_topic='sysmon-NetworkConnectInitiated', value_format='JSON', timestamp='creation_time', timestamp_format='yyyy-MM-dd''T''HH:mm:ss.SSS''Z''');
DESCRIBE sysmon_network_stream;
CREATE STREAM sysmon_stream_keyed AS SELECT * FROM sysmon_network_stream PARTITION BY COMMUNITY_ID;
SELECT * FROM sysmon_stream_keyed EMIT CHANGES;
Create Osquery KSQL stream
CREATE STREAM osquery_win_socket_events_stream (create_timestamp VARCHAR, src_ip VARCHAR, src_port INTEGER, dest_ip VARCHAR, dest_port INTEGER, protocol VARCHAR, username VARCHAR, hostname VARCHAR, action VARCHAR, community_id VARCHAR, name VARCHAR, process_name VARCHAR, process_id integer) WITH ( kafka_topic='osquery-win_socket_events', value_format='JSON', timestamp='create_timestamp', timestamp_format='yyyy-MM-dd''T''HH:mm:ss.SSS''Z''');
DESCRIBE osquery_win_socket_events_stream;
CREATE STREAM osquery_win_socket_events_stream_keyed AS SELECT * FROM osquery_win_socket_events_stream PARTITION BY COMMUNITY_ID;
SELECT * FROM osquery_win_socket_events_stream_keyed EMIT CHANGES;
Combine Sysmon and Zeek stream
CREATE STREAM zeek_sysmon_stream AS SELECT z.creation_time, z.uid, z.src_ip, z.src_port, z.dest_ip, z.dest_port, z.conn_type, z.community_id, z.local_orig, z.orig_ip_bytes, z.missed_bytes, z.local_resp, z.history, z.resp_pkts, z.orig_pkts, z.resp_ip_bytes, z.conn_state, s.process_id, s.process_name FROM zeek_conn_stream_keyed as z JOIN sysmon_stream_keyed as s WITHIN 5 MINUTES ON z.community_id=s.community_id;
DESCRIBE zeek_sysmon_stream;
SELECT * FROM zeek_sysmon_stream EMIT CHANGES;
Combine Osquery and Zeek stream
CREATE STREAM zeek_osquery_stream AS SELECT z.creation_time, z.uid, z.src_ip, z.src_port, z.dest_ip, z.dest_port, z.conn_type, z.community_id, z.local_orig, z.orig_ip_bytes, z.missed_bytes, z.local_resp, z.history, z.resp_pkts, z.orig_pkts, z.resp_ip_bytes, z.conn_state, o.username, o.hostname, o.process_id, o.process_name FROM zeek_conn_stream_keyed as z JOIN osquery_win_socket_events_stream_keyed as o WITHIN 5 MINUTES ON z.community_id=o.community_id;
DESCRIBE zeek_osquery_stream;
SELECT * FROM zeek_osquery_stream EMIT CHANGES;
Configure Python script
As I stated above, this pipeline is based on My Logging Pipeline and this helper script is from that project, if you wanna know more about this script read that blog post. However, the TLDR is this Python script that will create a Splunk index, an HEC input, and a Kafka Connect connector to send data from a Kafka topic to Splunk.
virtualenv -p python3 venv
source venv/bin/activate
pip3 install -r requirements.txt
mv conf/python/config.yml.example conf/python/config.yml
- Open
conf/python/config.yml
with your favorite text editor and set:external_url
– Replace<IP addr of Docker>
with the IP address of Docker-
connect_extenral_url
-Replace<IP addr of Docker>
with the IP address of Docker index
– Set the index name that will be created- For Osquery set topics to
ZEEK_OSQUERY
- For Sysmon set topics to
ZEEK_SYSMON
- For Osquery set topics to
topics
– Set topics to a list of Kafka topics you want to be ingested by Splunk- For Osquery set topics to
ZEEK_OSQUERY_STREAM
- For Sysmon set topics to
ZEEK_SYSMON_STREAM
- For Osquery set topics to
Setup Kafka Connect and Splunk connector
python3 splunk-kafka-connector.py --all
Testing setup with Powershell Empire on Ubuntu 18.04
Install/Setup Powershell Empire
- Start a Ubuntu 18.04 VM
- Open terminal
sudo su
cd /opt && git clone https://github.com/BC-SECURITY/Empire.git
cd Empire
sudo ./setup/install.sh
./empire
Setup/Configure HTTP listener
listeners
uselistener http
set Name http80
set Port 80
execute
-
back
Create Powershell stager
usestager multi/launcher http80
execute
- Copy output
Detonate Powershell stager
- Open Powershell as Administrator
- Paste output from Empire above
Process injection
interact <agent ID>
psinject http80 explorer
agents
Query Splunk
- Browse to https://<IP addr of Docker> and login into Splunk
- Select Search
- Enter
index="zeek_osquery" PROCESS_NAME="C:\\Windows\\explorer.exe"
Lessons learned
I am currently reading a book called “Cracking the Coding Interview” and it is a great book!!! One interesting part of the book is their matrix to describe projects you worked on and the matrix contains the following sections which are: challenges, mistakes/failures, enjoyed, leadership, conflicts, and what would you’d do differently. I am going to try and use this model at the end of my blog posts moving forward to summarize and reflect on things I learned but also the challenges. The purpose of this lesson learned section is to distill the experiences from my security research project that should be actively taken into account in future projects and to share the knowledge of my journey.
New skills/knowledge
- Learned how to use the Logstash translate filter
- Learned how to set up and configure Sysmon with Winlogbeat
- Learned how to set up Logstash and Filebeat with TLS without the need for client certificates
- Learned how to set up and configure Osquery with Polylogyx extensions
- Learned how to use the network CommunityID hash to correlate data sources
- Learned how to use Osquery decorators to append system information to each result
- Used the new Powershell Empire by BC-Security
- Learned how SUPER IMPORTANT it is to set a row key when JOINING two KSQL streams
- First time I installed the Zeek binary instead of compiling it from source
Challenges
- Getting Polylogyx to work with newer versions of Osquery
- Having to create Logstash filters for each data source to adhere to the Splunk Network Traffic CIM
- Timestamps lead to ALOT of complications. Sysmon and Zeek have the same timestamp format in UTC so correlating the events was easy but the same was not true for Osquery. Since the Osquery timestamp was not the same it caused issues with KSQL doing correlation within a window.
- Troubleshooting a complicated system when issues arise.
Mistakes/Failures
- Assuming I could use KSQL just like SQL and JOIN based on any row value but only
rowkeys
can be joined. - Assuming that when JOINing streams in KSQL that the WITHIN X amount timestamp was performing that action on when the event arrived to KSQL, NOT when the event was generated. Zeek was producing timestamps in UTC +0 but Osquery was using local timezone which was a 6-hour difference. So while the events coming into KSQL were never more than 2 minutes apart the generated timestamps had huge discrepancies.
- The local time being displayed by Windows was correct but the incorrect timezone was selected for Windows. Osquery reads the timezone selected by Windows and generated timestamps based on that. The local time being displayed was 04:09 PM EST but the timezone was -8 which is PST. Osquery was generating “incorrect” timezones based on this.
What You’d Do Differently
- Incorporate Suricata and EDR technologies
- Create signatures for processes and how/who they should communicate with. For example,
explorer.exe
will make external calls to Microsoft but it shouldn’t be making calls to a random IP address. - Engineer a solution to do this POC with macOS and Linux
- In the current implementation of this POC, KSQL requires an endpoint network event and a Zeek network event with the required attributes. If the endpoint were to stop logging then the Zeek events would not have an event to correlate and after 5 mins KSQL ignores those events. Unfortunately, I am unaware of a setting that would flush data in the pipeline if a match was not found, say within X amount of time.
Projects inspired by this work
- Osquery community ID – Since Osquery didn’t have the communityID function I had to write my own Osquery Python extension, which lead to the creation of multiple Osquery extensions
- My Logging pipeline
- The architecture in this blog post inspired that architecture design
DISCLAIMER
The information contained in this blog post is for educational purposes ONLY! HoldMyBeerSecurity.com/HoldMyBeer.xyz and its authors DO NOT hold any responsibility for any misuse or damage of the information provided in blog posts, discussions, activities, or exercises.
DISCLAIMER
References
- Zeek – Installing
- Osquery – Downlods
- Zeek Package Manager – Installation
- Zeek Package Manager – Homepage
- Zeek blog – Community_ID
- How to stop and disable auditd on RHEL 7
- Process and socket auditing with osquery
- Github – corelight/json-streaming-logs
- How to ship JSON logs via Rsyslog
- QueryCon 2018 | Chris Long (Palantir) – Practical System Auditing with Osquery
- Working with Nested JSON Data
- KSQL Syntax Reference
- Download Winlogbeat
- Github – Elasticsearch 7.0 Docker
- StackOverFlow – Error when trying to join a table and a stream
- Filebeat download
- Install Filebeat
- Creating a Composite Key
- CORRELATE NETWORK CONNECTIONS WITH COMMUNITY ID IN OSQUERY
- Github – polylogyx/osq-ext-bin
- Filebeat – Define processors
- Deploying Osquery Part 2 – The basics, cont.
- QueryConf – PolyLogyx Extension
this is amazing! I want to get this POC working on my lab and I have some questions regarding the logstash input and output. Do you happen to have the input/output conf file for logstash? What fields were exactly normalized by logstash?
I’m trying to do POC for sysmon/winlogbeat along with zeek/filebeat to logstash >> kafka.
Hey Phil,
I post all my infra-as-code on my Github but the configs you are looking for are lcoated at https://github.com/CptOfEvilMinions/KSQL-Sysmon-Osquery-Zeek/tree/master/conf/logstash/pipeline.
What are you using to separate kafka topics per source? it looks like winlogbeat.yml is tagged with a single tag name “sysmon” but ksql shows multiple topics for sysmon. Also same question for zeek as well.
Hey Phil,
Please take a look at this Logstash output config for how I am splitting up the sources
Merci pour le partage.