PoC: Using KSQL to enrich Zeek logs with Osquery and Sysmon data

In incident response, time is precious and something you can never get back. Typically, when I receive a security alert about an endpoint, it requires manual lookups on multiple data sources for critical pieces of information. These manual lookups can be time-consuming, create fatigue, and don’t use the power of technology to your advantage. This blog post will demonstrate a proof-of-concept (POC) by using the power of a network community ID hash by Corelight to fuse endpoint and network-based data sources.

KSQL by Confluent provides the ability to enrich independent data sources by correlating common attributes. In this POC, we are going to use Sysmon or Osquery to monitor the endpoint and Zeek to monitor the network. Not only will this blog post serve as a POC but it will discuss the architecture, design decisions, working infrastructure-as-code, and the knowledge I accumulated from this project. The hope is that this POC will serve as a framework for the infosec community to use to perform log enrichment. Lastly, I will demonstrate the power of this POC by detecting a Powershell Empire agent that has been injected into explorer.exe.


The information contained in this blog post is for educational purposes ONLY! HoldMyBeerSecurity.com/HoldMyBeer.xyz and its authors DO NOT hold any responsibility for any misuse or damage of the information provided in blog posts, discussions, activities, or exercises. 


Problem statement

First, take a look at the screenshot on the left and attempt to determine if it’s a malicious connection. Now, look at a similar log event on the right that has been enriched and tell me if it’s malicious. The difference between the two log events is minimal but that process information makes a HUGE impact on your incident response capabilities and the time it takes to triage an alert. If you are not familiar with Windows systems, the process explorer.exe SHOULD NEVER be making a network connection, especially to a DigitalOcean server in Bangalore, India.

By correlating endpoint-based events and network-based events, we can get the full picture of activity occurring on a machine. The left screenshot is a standard Zeek network connection event and the right screenshot is a Zeek network connection enriched with process information data. This SMALL BUT crucial piece of information drastically changes the game and the way we develop analytics on our data and our decision making.


This blog post is written to be a proof of concept and not a comprehensive post. This post will NOT cover how Osquery, Kafka, Sysmon, KSQL, Spunk, or Docker works, therefore this post assumes you know how these technologies work. Second, this blog post contains setups and configurations that may NOT be production-ready. The “Lessons learned” section discusses various improvements for this implementation.



What is a communityID?

CommunityID is a new feature being implemented by networking security applications such as Zeek and Suricata. A CommunityID is a hash of the tuple (destination IP address, source IP address, destination port, source port, protocol) and this tuple defines a unique connection. For example, let’s say Suricata detects malicious activity and when you examine the details of the alert it includes a unique hash as the value of communityID. Next, we can copy this unique communityID hash and search for it in the Zeek logs to review all the logs related to this connection.

What is Splunk’s Common Information Model (CIM)?

Splunk’s Common Information Model (CIM) is a model for Splunk administrators to follow (but it is not enforced) so all data sets have the same structure. For example, Osquery uses local_address for the source IP address but Zeek uses id.orig_h which at first glance is not a friendly convention. The Splunk Network Traffic CIM states this field name should be src_ip for all network logging sources. This makes a huge impact because instead of having to remember the naming convention for each logging source you just have to know the logging convention for a source type. Furthermore, the cross-correlation between Kafka topics with KSQL becomes much easier if the indexes have the same fields, as you will see below.

The architecture of the pipeline

A majority of this architecture is built on previous work I have done in a recent blog post called My Logging Pipeline: Logstash, Kafka, Splunk. Throughout this blog post, I will reference that post for brevity. If you would like a more thorough understanding of each component in this pipeline, please check out that blog post.

Network diagram

Tool and services overview

  • Osquery – exposes an operating system as a high-performance relational database. This allows you to write SQL-based queries to explore operating system data. With Osquery, SQL tables represent abstract concepts such as running processes, loaded kernel modules, open network connections, browser plugins, hardware events or file hashes.
  • Sysmon – System Monitor (Sysmon) is a Windows system service and device driver that, once installed on a system, remains resident across system reboots to monitor and log system activity to the Windows event log. It provides detailed information about process creations, network connections, and changes to file creation time. By collecting the events it generates using Windows Event Collection or SIEM agents and subsequently analyzing them, you can identify malicious or anomalous activity and understand how intruders and malware operate on your network
  • Zeek(formerly known as Bro) – is a passive, open-source network traffic analyzer. It is primarily a security monitor that inspects all traffic on a link in depth for signs of suspicious activity. More generally, however, Bro supports a wide range of traffic analysis tasks even outside of the security domain, including performance measurements and helping with troubleshooting.
  • Logstash – is an open source data collection engine with real-time pipelining capabilities. Logstash can dynamically unify data from disparate sources and normalize the data into destinations of your choice. Cleanse and democratize all your data for diverse advanced downstream analytics and visualization use cases. 
  • Kafka – is a community distributed event streaming platform capable of handling trillions of events a day. Initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Since being created and open sourced by LinkedIn in 2011, Kafka has quickly evolved from messaging queue to a full-fledged event streaming platform.
  • KSQL – is the streaming SQL engine for Apache Kafka. It provides an easy-to-use yet powerful interactive SQL interface for stream processing on Kafka without the need to write code in a programming language such as Java or Python. KSQL is scalable, elastic, fault-tolerant, and real-time. It supports a wide range of streaming operations, including data filtering, transformations, aggregations, joins, windowing, and sessionization.
  • Splunk – is an advanced, scalable, and effective technology that indexes and searches log files stored in a system. It analyzes the machine-generated data to provide operational intelligence. The main advantage of using Splunk is that it does not need any database to store its data, as it extensively makes use of its indexes to store the data. Splunk is a software mainly used for searching, monitoring, and examining machine-generated Big Data through a web-style interface

The flow of data explained

This section will explain the data pipeline in detail starting with how data is generated on the endpoint, how logs are shipped from the endpoint to Logstash, how logs are normalized by Logstash, how logs are correlated and enriched by KSQL, and how Kafka Connect sends the enriched logs to Splunk. First, we have a Windows 10 endpoint that is running Sysmon + Winlogbeat or Osquery + Polylogtx + Filebeat. Later in this blog post, I discuss the pros and cons of each setup but for now, all you need to know is these setups monitor which processes create network connections on the endpoint.

The Osquery option implements the Polylogyx extension pack to monitor the endpoint, specifically for network connections. On a periodic interval (default 30 seconds), Osquery will request all the recorded network connections from the Polyllogyx extension to generate a network communityID for each connection utilizing the Osquery community_id_v1 function. Next, Filebeat reads the C:\Program Files\osquery\osquery.results.log log for new entries and ships those entries off to Logstash for ingestion using the Beats protocol.

The Sysmon option utilizes Winlogbeat for network CommunityID generation. The Filebeat and Winlogbeat logging clients have a capability called “processors” which provides the ability to “enhance data before sending it to the configured output”. One of these builtin processors is the Community ID Network Flow Hash which can read network connection logs generated by Sysmon and enrich the log with a Community ID based on values within that log entry.

For this POC we utilized the SwiftOnSeccurity Sysmon config to monitor the Windows endpoint. The activity monitored by Sysmon is recorded to the Windows Event Log located at Applications and Services Logs/Microsoft/Windows/Sysmon/Operational. Winlogbeat is configured to read the Windows Event Log for new Sysmon entries, enrich the log entry with CommunityID, and ship the logs off to Logstash for ingestion using the Beats protocol.

Roughly at the same time, Zeek is monitoring the network where the Windows endpoint(s) reside recording all the network connections. Zeek has been configured with the Zeek Package Manager (zkg) to log all Zeek events in JSON format and to append the network CommunityID to each event. By default Zeek logs all connections to /opt/zeek/logs/current/json_streaming_*.log and Filebeat monitors this directory for new events. When Filebeat detects a new log entry it ships the event to Logstash for ingestion using the Beats protocol.

The Logstash component of this pipeline deserves its own section because a lot of important steps occur. Logstash is typically configured with the following phases: inputs, filters, and outputs. Logstash is configured to ingest logs using the Beats protocol, which is the protocol used by Filebeat and Winlogbeat. Next, there are a series of Logstash filters that are applied to individual log sources (Osquery, Sysmon, and Zeek). The Osquery filter inspects all logs containing the osquery tag. If a log matches this tag, the message payload is extracted to the root of the JSON structure and the key names of Osquery are normalized using the Splunk Network Traffic CIM, such as converting remote_address to dest_ip, converting the epoch timestamp to ISO8601, and lastly removing metadata generated by Filebeat.

The Sysmon filter inspects all logs containing the sysmon tag. If a log matches this tag, the key name event.code is renamed to event_code. Next, a new field named event_name is created which is populated based on a static dictionary that maps Sysmon event codes to the human-friendly name, such as event_code: 3 is event_name: NetworkConnectInitiated.  Next, for this POC a simple filter was created to target Sysmon event_code: 3 and when a log event matches that code the log is normalized using the Splunk Network Traffic CIM. Lastly, the filter removes the metadata generated by Winlogbeat.

The Zeek filter inspects all logs containing the zeek tag. If a log matches this tag, the message payload is extracted to the root of the JSON structure, any Zeek logs that don’t contain the field _path are dropped, key names of Zeek are normalized (id.orig_h -> src_ip, id.resp_h -> dest_ip) using the Splunk Network Traffic CIM, and lastly the metadata generated by Filebeat is removed.

The final phase of Logstash is the output config. The output section only executes on logs that contain the tags discussed above in the filters. The output phase is configured to send all logs with the zeek tag to Kafka and more specifically each Zeek log type is sent to its own topic via topic_id => "zeek-%{[_path]}". This line configuration will extract _path (Zeek log type: DNS, conn, x509, SSL, etc) and send it to that topic. In addition to sending all Zeek logs to Kafka, Logstash ensures delivery by instructing Kafka to send back an ACK if it received the message, like TCP. The Osquery and Sysmon outputs follow a similar flow and I won’t repeat myself for brevity.

Next, KSQL is configured to ingest logs from Kafka topics created by Logstash to correlate and enrich logs from each data source. First, each Kafka topic for a data source has an associated KSQL stream created. Each stream is then used to create another stream but this stream is created with a key (rowkey) which is the value of the communityID. This rowkey (CommunityID) is used to populate each entry with a unique value that is used for correlation.

For Sysmon the Kafka topic sysmon_stream_keyed (which contains Sysmon events about network connections) and zeek_conn_stream_keyed (which contains Zeek network events) are used to create zeek_sysmon_stream. This Kafka stream is the fusion of Zeek network events that have been enriched by process information from Sysmon logs that were correlated using the rowkey which is the network CommunityID. The Osquery setup uses the same approach but with the following Kafka topics osquery_win_socket_events_stream_keyed and zeek_conn_stream_keyed.

The last phase of this pipeline is Kafka Connect reading the enriched logs from zeek_osquery_stream or zeek_sysmon_stream and sending them to Splunk to be ingested. Furthermore, Kafka Connect can be configured to send the Kafka topics zeek-conn, osquery_win_socket_events, and sysmon-NetworkConnectInitiated (which contains the original data sources) to Splunk.

Alternative implementations?

Zeek-osquery and Bro-Sysmon are open-source alternatives to my implementation. Both of these alternatives can utilize Zeek/BRO on your local network to ingest and correlate logs between network and endpoint events. However, these alternative solutions have some differences which I will discuss independently for each platform. The goal of this section is to discuss alternative solutions and to compare each one.

  •   Bro-Sysmon
    • A project that hasn’t been updated in two years – All configurations are written for BRO v2.X and are not compatible with Zeek v3.X out of the box
    • Requires a Python middle man between the log ingestor (Logstash) and Zeek
    • Provides the capability to utilize JA3 hashes.
  • Zeek-Osquery
    • The query scheduler is Zeek, which means you can’t use something like Kolide or Upytcs
    • Osquery needs to be compiled from source to add functionality
      • Built and tested with an old version of Osquery 3.3.0 and the current version at the time of this writing is 4.3.0
    • Only supports an older version of Bro v2.6.4
      • Bro needs to be compiled from source to include functionality

Choosing your own adventure

Osquery vs. Sysmon

Both platforms have their pros and cons which means you need to pick the best platform for YOUR needs. Below I have covered the differences between the two platforms with respect to this blog post.

  • Sysmon
    • Pros
      • SwiftOnSecurity’s config which is a recognized and actively maintained configuration
      • Real-time monitoring
    • Cons
      • Log events are written in XML to Windows Event logger
      • Requires a driver
      • Limited to monitor 22 event types
  • Osquery
    • Pros
      • Actively maintained project by Facebook, Kolide, and Uptycs
      • Provides the ability to be connected to a query scheduler like Uptycs or Kolide
      • Provides unlimited possibilities to extract information due to the 200+ tables
      • Provides the ability for further investigation and evidence collection with Upytcs and Kolide
      • If connected to a task scheduler no need for Filebeat to ship logs
    • Cons

Install/Setup Sysmon + Filebeat on Windows 10

Install Sysmon

  1. Open Powershell as Administrator
  2. cd $ENV:TMP
  3. Invoke-WebRequest -Uri https://download.sysinternals.com/files/Sysmon.zip -OutFile Sysmon.zip
    1. Download latest Symon
  4. Expand-Archive .\Sysmon.zip -DestinationPath .
    1. Unzipping Sysmon
  5. Invoke-WebRequest -Uri https://raw.githubusercontent.com/SwiftOnSecurity/sysmon-config/master/sysmonconfig-export.xml -OutFile sysmonconfig-export.xml
    1. Download SwiftOnSecurity Sysmon config
  6. .\Sysmon.exe -accepteula -i .\sysmonconfig-export.xml
    1. Install Sysmon and load config

Install Winlogbeat

  1. cd $ENV:TMP
  2. Invoke-WebRequest -Uri https://artifacts.elastic.co/downloads/beats/winlogbeat/winlogbeat-7.7.0-windows-x86_64.zip -OutFile winlogbeat-7.7.0-windows-x86_64.zip
    1. Download latest Winlogbeat version
  3. Expand-Archive .\winlogbeat-7.7.0-windows-x86_64.zip -DestinationPath .
    1. Unzipping Winlogbeat
  4. mv .\winlogbeat-7.7.0-windows-x86_64 'C:\Program Files\winlogbeat'
  5. cd 'C:\Program Files\winlogbeat\'
  6. Invoke-WebRequest -Uri https://raw.githubusercontent.com/CptOfEvilMinions/BlogProjects/master/sysmon-winlogbeat-communityid/conf/winlogbeat/winlogbeat.yml -OutFile winlogbeat.yml
  7. Using your favorite text editor open 'C:\Program Files\winlogbeat\winlogbeat.yml'
    1. Open the document from the command line with Visual Studio Code: code .\winlogbeat.yml
    2. Open the document from the command line with Notepad: notepad.exe.\winlogbeat.yml
  8. Scroll down to the output.logstash:
    1. Replace logstash_ip_addr with the IP address of FQDN of Logstash
    2. Replace logstash_port with the port Logstash uses to ingest Beats (default 5044)
  9. powershell -Exec bypass -File .\install-service-winlogbeat.ps1
    1. Install Winlogbeat
  10. Set-Service -Name "winlogbeat" -StartupType automatic
  11. Start-Service -Name "winlogbeat"
  12. Get-Service -Name "winlogbeat"

Install/Setup Osquery + Filebeat on Windows 10

Install/Setup Osquery on Windows 10

  1. Open Powershell as Administrator
  2. Invoke-WebRequest -Uri https://pkg.osquery.io/windows/osquery-4.2.0.msi -OutFile osquery-4.2.0.msi -MaximumRedirection 3
    1. Download Osquery
  3. Start-Process $ENV:TEMP\osquery-4.2.0.msi -ArgumentList '/quiet' -Wait
    1.  Install Osquery
  4. Invoke-WebRequest -Uri https://raw.githubusercontent.com/CptOfEvilMinions/KSQL-Sysmon-Osquery-Zeek/master/conf/osquery/extensions.load -OutFile 'C:\Program Files\osquery\extensions.load'
  5. Invoke-WebRequest -Uri https://raw.githubusercontent.com/CptOfEvilMinions/KSQL-Sysmon-Osquery-Zeek/master/conf/osquery/osquery.conf -OutFile 'C:\Program Files\osquery\osquery.conf'
  6. Invoke-WebRequest -Uri https://raw.githubusercontent.com/CptOfEvilMinions/KSQL-Sysmon-Osquery-Zeek/master/conf/osquery/osquery.flags -OutFile 'C:\Program Files\osquery\osquery.flags'

Install/Setup Polylogyx-osquery extensions

  1. cd $ENV:TEMP
  2. Invoke-WebRequest -Uri https://github.com/polylogyx/osq-ext-bin/archive/v1.0.40.1.zip -OutFile osq-ext-bin.zip -MaximumRedirection 3
  3. Expand-Archive .\osq-ext-bin.zip -DestinationPath .
  4. cd osq-ext-bin-*
  5. (Get-Content -Path .\plgxupgrade.ps1 -Raw) -replace "Env:Programdata","Env:ProgramFiles" | Set-Content -Path .\plgxupgrade.ps1
  6. powershell -Exec bypass -File .\plgxupgrade.ps1
    1. Ignore errors that the “vast” and “vastnw” service can’t be found
  7. New-Item -Path "C:\Program Files\osquery" -Name "Extensions" -ItemType "directory"
    1. Create Extensions directory
  8. Copy-Item C:\Program Files\osquery\plgx_win_extension.ext.exe -Destination 'C:\Program Files\osquery\Extensions\plgx_win_extension.ext.exe'
    1. Copy extension to directory
  9. cd 'C:\Program Files\osquery'
  10. icacls .\Extensions /setowner Administrators /t
  11. icacls .\Extensions /grant Administrators:f /t
  12. icacls .\Extensions /inheritance:r /t
  13. icacls .\Extensions /inheritance:d /t

Test Osquery-polylogyx and win_socket_events table

  1. Stop-Service -Name osqueryd
  2. .\osqueryi.exe --flagfile .\osquery.flags
  3. .tables
  4. SELECT * FROM win_socket_events;
  5. SELECT community_id_v1(local_address,remote_address,local_port,remote_port,protocol) as community_id, * FROM win_socket_events WHERE action='SOCKET_CONNECT';
  6. .quit
  7. Start-service osqueryd
  8. Get-Service osqueryd

Install/Setup Filebeat agent for Windows

  1. cd $ENV:TEMP
  2. Invoke-WebRequest -Uri https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.7.0-windows-x86_64.zip -OutFile filebeat-7.7.0-windows-x86_64.zip -MaximumRedirection 3
    1. Download Filebeat
  3. Expand-Archive .\filebeat-7.7.0-windows-x86_64.zip -DestinationPath .
    1. Unzip Filebeat
  4. Copy-Item ".\filebeat-7.7.0-windows-x86_64\" -Destination 'C:\Program Files\Filebeat\' -Recurse
    1. Copy Filebeat directory to C:\Program Files
  5. cd 'C:\Program Files\Filebeat'
  6. Invoke-WebRequest -Uri https://raw.githubusercontent.com/CptOfEvilMinions/BlogProjects/master/ksql-osquery-zeek-correlator/conf/windows-filebeat/filebeat.yml -OutFile 'C:\Program Files\Filebeat\filebeat.yml'
  7. Using your favorite text editor open 'C:\Program Files\winlogbeat\winlogbeat.yml'
    1. Open the document from the command line with Visual Studio Code: code .\winlogbeat.yml
    2. Open the document from the command line with Notepad: notepad.exe.\winlogbeat.yml
  8. Scroll down to the output.logstash:
    1. Replace logstash_ip_addr with the IP address of FQDN of Logstash
    2. Replace logstash_port with the port Logstash uses to ingest Beats (default 5044)
  9. powershell -Exec bypass -File .\install-service-filebeat.ps1
  10. Start-Service -Name filebeat
  11. Get-Service -Name filebeat

Install/Setup Zeek 3.X + Filebeat on Ubuntu 20.04

Install Zeek from a package

  1. sudo sh -c "echo 'deb http://download.opensuse.org/repositories/security:/zeek/xUbuntu_20.04/ /' > /etc/apt/sources.list.d/security:zeek.list"
  2. wget -nv https://download.opensuse.org/repositories/security:zeek/xUbuntu_20.04/Release.key -O Release.key
  3. sudo apt-key add - < Release.key
  4. sudo apt-get update -y
  5. sudo apt-get install zeek -y
  6. /opt/zeek/bin/zeek -v

Setup Zeek


  1. Run the command ip a
  2. Determine the interface you want Zeek to monitor
  3. sudo sed -i 's/interface=eth0/interface=<interface you want Zeek to monitor>/g' /opt/zeek/etc/node.cfg
  4. cat /opt/zeek/etc/node.cfg


  1. Determine the networks Zeek will be monitoring
  2. sudo vim /opt/zeek/etc/networks.cfg
  3. Add the network CIDRs

Install/Setup Zeek Package Manager (zkg)

  1. sudo su
  2. apt-get install python3 python3-pip -y
  3. pip3 install zkg
  4. export PATH=/opt/zeek/bin:$PATH
    1. Add Zeek binaries to your $PATH
  5. zkg autoconfig
  6. printf "\n\n# Load ZKG packages\[email protected] packages\n" | sudo tee -a /opt/zeek/share/zeek/site/local.zeek

Install JSON and CommunityID package

  1. zkg install json-streaming-logs
  2. sed -i 's#const JSONStreaming::disable_default_logs = F#const JSONStreaming::disable_default_logs = T#g' /opt/zeek/share/zeek/site/json-streaming-logs/main.bro
    1. Disable traditional TSV logging
  3. apt-get install cmake -y
  4. zkg install zeek-community-id
  5. /opt/zeek/bin/zeekctl check
  6. sed -i 's/bro_init()/zeek_init()/g' /opt/zeek/share/zeek/site/packages/./json-streaming-logs/./main.bro
  7. sed -i 's/bro_init()/zeek_init()/g' /opt/zeek/share/zeek/site/packages/./json-streaming-logs/./main.bro

Start Zeek

  1. /opt/zeek/bin/zeekctl deploy
  2. /opt/zeek/bin/zeekctl status

Test Zeek setup

  1. curl https://www.google.com
  2. tail -f /opt/zeek/logs/current/json_streaming_conn.log

Install/Setup Filebeat for Zeek

Install Filebeat

  1. wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | apt-key add -
  2. apt-get install apt-transport-https -y
  3. echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | tee -a /etc/apt/sources.list.d/elastic-7.x.list
  4. apt-get update -y && apt-get install filebeat -y
  5. systemctl enable filebeat

Setup Filebeat

  1. mkdir /etc/filebeat/conf.d
  2. curl https://raw.githubusercontent.com/CptOfEvilMinions/KSQL-Sysmon-Osquery-Zeek/master/conf/filebeat/zeek/filebeat.yml -o /etc/filebeat/filebeat.yml
  3. curl https://raw.githubusercontent.com/CptOfEvilMinions/KSQL-Sysmon-Osquery-Zeek/master/conf/filebeat/zeek/zeek.yml -o /etc/filebeat/conf.d/zeek.yml
  4. sed -i 's/logstash_ip_addr/<Logstash IP addr or FQDN>/g' /etc/filebeat/filebeat.yml
  5. sed -i 's/logstash_port/<Logstash port>/g' /etc/filebeat/filebeat.yml
  6. sudo systemctl restart filebeat

Deploy logging stack with Docker

Spin up Docker stack

  1. docker-compose build
  2. docker-compose up -d
  3. docker stats

Setup KSQL with KSQL client

Typically, I dislike when people run commands after docker-compose inside containers but this is to demonstrate how this works behind the scenes.

  1. docker ps
  2. docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
  3. SHOW topics;

Test zeek-conn topic

  1. Run this on Zeek curl http://google.com
  2. PRINT 'zeek-conn';

Test Osquery topic

  1. Run this on the Windows endpoint Invoke-WebRequest -Uri https://google.com
  2. PRINT 'osquery-win_socket_events';

Test Sysmon topic

  1. Open Powershell
  2. Invoke-WebRequest -Uri https://google.com
  3. PRINT 'sysmon-NetworkConnectInitiated';

Create Zeek conn KSQL stream

  1. CREATE STREAM zeek_conn_stream (creation_time VARCHAR, uid VARCHAR, conn_type VARCHAR, src_ip VARCHAR, src_port INTEGER, dest_ip VARCHAR, dest_port INTEGER, protocol VARCHAR, conn_state VARCHAR, local_orig VARCHAR, local_resp VARCHAR, missed_bytes BIGINT, history VARCHAR, orig_pkts BIGINT, orig_ip_bytes BIGINT, resp_pkts BIGINT, resp_ip_bytes BIGINT, community_id VARCHAR ) WITH ( kafka_topic='zeek-conn', value_format='JSON', timestamp='creation_time', timestamp_format='yyyy-MM-dd''T''HH:mm:ss.SSSSSS''Z''');
  2. DESCRIBE zeek_conn_stream;
  3. CREATE STREAM zeek_conn_stream_keyed AS SELECT * FROM zeek_conn_stream PARTITION BY COMMUNITY_ID;
  4. SELECT * FROM zeek_conn_stream_keyed EMIT CHANGES;

Create Sysmon network KSQL stream

  1. CREATE STREAM sysmon_network_stream ( creation_time VARCHAR, event_code INTEGER, community_id VARCHAR, src_ip VARCHAR, src_port INTEGER, dest_ip VARCHAR, dest_port INTEGER, protocol VARCHAR, direction VARCHAR, hostname VARCHAR, process_id INTEGER, process_name VARCHAR, process_exec VARCHAR, OS VARCHAR) WITH ( kafka_topic='sysmon-NetworkConnectInitiated', value_format='JSON', timestamp='creation_time', timestamp_format='yyyy-MM-dd''T''HH:mm:ss.SSS''Z''');
  2. DESCRIBE sysmon_network_stream;
  3. CREATE STREAM sysmon_stream_keyed AS SELECT * FROM sysmon_network_stream PARTITION BY COMMUNITY_ID;
  4. SELECT * FROM sysmon_stream_keyed EMIT CHANGES;


Create Osquery KSQL stream

  1. CREATE STREAM osquery_win_socket_events_stream (create_timestamp VARCHAR, src_ip VARCHAR, src_port INTEGER, dest_ip VARCHAR, dest_port INTEGER, protocol VARCHAR, username VARCHAR, hostname VARCHAR, action VARCHAR, community_id VARCHAR, name VARCHAR, process_name VARCHAR, process_id integer) WITH ( kafka_topic='osquery-win_socket_events', value_format='JSON', timestamp='create_timestamp', timestamp_format='yyyy-MM-dd''T''HH:mm:ss.SSS''Z''');
  2. DESCRIBE osquery_win_socket_events_stream;
  3. CREATE STREAM osquery_win_socket_events_stream_keyed AS SELECT * FROM osquery_win_socket_events_stream PARTITION BY COMMUNITY_ID;
  4. SELECT * FROM osquery_win_socket_events_stream_keyed EMIT CHANGES;

Combine Sysmon and Zeek stream

  1. CREATE STREAM zeek_sysmon_stream AS SELECT z.creation_time, z.uid, z.src_ip, z.src_port, z.dest_ip, z.dest_port, z.conn_type, z.community_id, z.local_orig, z.orig_ip_bytes, z.missed_bytes, z.local_resp, z.history, z.resp_pkts, z.orig_pkts, z.resp_ip_bytes, z.conn_state, s.process_id, s.process_name FROM zeek_conn_stream_keyed as z JOIN sysmon_stream_keyed as s WITHIN 5 MINUTES ON z.community_id=s.community_id;
  2. DESCRIBE zeek_sysmon_stream;
  3. SELECT * FROM zeek_sysmon_stream EMIT CHANGES;

Combine Osquery and Zeek stream

  1. CREATE STREAM zeek_osquery_stream AS SELECT z.creation_time, z.uid, z.src_ip, z.src_port, z.dest_ip, z.dest_port, z.conn_type, z.community_id, z.local_orig, z.orig_ip_bytes, z.missed_bytes, z.local_resp, z.history, z.resp_pkts, z.orig_pkts, z.resp_ip_bytes, z.conn_state, o.username, o.hostname, o.process_id, o.process_name FROM zeek_conn_stream_keyed as z JOIN osquery_win_socket_events_stream_keyed as o WITHIN 5 MINUTES ON z.community_id=o.community_id;
  2. DESCRIBE zeek_osquery_stream;
  3. SELECT * FROM zeek_osquery_stream EMIT CHANGES;

Configure Python script

As I stated above, this pipeline is based on My Logging Pipeline and this helper script is from that project, if you wanna know more about this script read that blog post. However, the TLDR is this Python script that will create a Splunk index, an HEC input, and a Kafka Connect connector to send data from a Kafka topic to Splunk.

  1. virtualenv -p python3 venv
  2. source venv/bin/activate
  3. pip3 install -r requirements.txt
  4. mv conf/python/config.yml.example conf/python/config.yml
  5. Open conf/python/config.yml with your favorite text editor and set:
    1. external_url – Replace  <IP addr of Docker> with the IP address of Docker
    2. connect_extenral_url -Replace  <IP addr of Docker> with the IP address of Docker
    3. index – Set the index name that will be created
      1. For Osquery set topics to ZEEK_OSQUERY
      2. For Sysmon set topics to ZEEK_SYSMON
    4. topics – Set topics to a list of Kafka topics you want to be ingested by Splunk
      1. For Osquery set topics to ZEEK_OSQUERY_STREAM
      2. For Sysmon set topics to ZEEK_SYSMON_STREAM

Setup Kafka Connect and Splunk connector

  1. python3 splunk-kafka-connector.py --all

Testing setup with Powershell Empire on Ubuntu 18.04

Install/Setup Powershell Empire

  1. Start a Ubuntu 18.04 VM
  2. Open terminal
  3. sudo su
  4. cd /opt && git clone https://github.com/BC-SECURITY/Empire.git
  5. cd Empire
  6. sudo ./setup/install.sh
  7. ./empire

Setup/Configure HTTP listener

  1. listeners
  2. uselistener http
    1. set Name http80
    2. set Port 80
    3. execute
  3.  back

Create Powershell stager

  1. usestager multi/launcher http80
  2. execute
    1. Copy output

Detonate Powershell stager

  1. Open Powershell as Administrator
  2. Paste output from Empire above

Process injection

  1. interact <agent ID>
    1. psinject http80 explorer
  2. agents

Query Splunk

  1. Browse to https://<IP addr of Docker> and login into Splunk
  2. Select Search
  3. Enter index="zeek_osquery" PROCESS_NAME="C:\\Windows\\explorer.exe"

Lessons learned

I am currently reading a book called “Cracking the Coding Interview” and it is a great book!!! One interesting part of the book is their matrix to describe projects you worked on and the matrix contains the following sections which are: challenges, mistakes/failures, enjoyed, leadership, conflicts, and what would you’d do differently. I am going to try and use this model at the end of my blog posts moving forward to summarize and reflect on things I learned but also the challenges. The purpose of this lesson learned section is to distill the experiences from my security research project that should be actively taken into account in future projects and to share the knowledge of my journey.

New skills/knowledge

  • Learned how to use the Logstash translate filter
  • Learned how to set up and configure Sysmon with Winlogbeat
  • Learned how to set up Logstash and Filebeat with TLS without the need for client certificates
  • Learned how to set up and configure Osquery with Polylogyx extensions
  • Learned how to use the network CommunityID hash to correlate data sources
  • Learned how to use Osquery decorators to append system information to each result
  • Used the new Powershell Empire by BC-Security
  • Learned how SUPER IMPORTANT it is to set a row key when JOINING two KSQL streams
  • First time I installed the Zeek binary instead of compiling it from source


  • Getting Polylogyx to work with newer versions of Osquery
  • Having to create Logstash filters for each data source to adhere to the Splunk Network Traffic CIM
  • Timestamps lead to ALOT of complications. Sysmon and Zeek have the same timestamp format in UTC so correlating the events was easy but the same was not true for Osquery. Since the Osquery timestamp was not the same it caused issues with KSQL doing correlation within a window.
  • Troubleshooting a complicated system when issues arise.


  • Assuming I could use KSQL just like SQL and JOIN based on any row value but only rowkeys can be joined.
  • Assuming that when JOINing streams in KSQL that the WITHIN X amount timestamp was performing that action on when the event arrived to KSQL, NOT when the event was generated. Zeek was producing timestamps in UTC +0 but Osquery was using local timezone which was a 6-hour difference. So while the events coming into KSQL were never more than 2 minutes apart the generated timestamps had huge discrepancies.
  • The local time being displayed by Windows was correct but the incorrect timezone was selected for Windows. Osquery reads the timezone selected by Windows and generated timestamps based on that. The local time being displayed was 04:09 PM EST but the timezone was -8 which is PST. Osquery was generating “incorrect” timezones based on this.

What You’d Do Differently

  • Incorporate Suricata and EDR technologies
  • Create signatures for processes and how/who they should communicate with. For example, explorer.exe will make external calls to Microsoft but it shouldn’t be making calls to a random IP address.
  • Engineer a solution to do this POC with macOS and Linux
  • In the current implementation of this POC, KSQL requires an endpoint network event and a Zeek network event with the required attributes. If the endpoint were to stop logging then the Zeek events would not have an event to correlate and after 5 mins KSQL ignores those events. Unfortunately, I am unaware of a setting that would flush data in the pipeline if a match was not found, say within X amount of time.

Projects inspired by this work


The information contained in this blog post is for educational purposes ONLY! HoldMyBeerSecurity.com/HoldMyBeer.xyz and its authors DO NOT hold any responsibility for any misuse or damage of the information provided in blog posts, discussions, activities, or exercises. 



5 thoughts on “PoC: Using KSQL to enrich Zeek logs with Osquery and Sysmon data

  1. Phil says:

    this is amazing! I want to get this POC working on my lab and I have some questions regarding the logstash input and output. Do you happen to have the input/output conf file for logstash? What fields were exactly normalized by logstash?

    I’m trying to do POC for sysmon/winlogbeat along with zeek/filebeat to logstash >> kafka.

  2. Phil says:

    What are you using to separate kafka topics per source? it looks like winlogbeat.yml is tagged with a single tag name “sysmon” but ksql shows multiple topics for sysmon. Also same question for zeek as well.

  3. erotik izle says:

    Merci pour le partage.

Leave a Reply

Your email address will not be published.