Skip to main content

Logstash Configuration

The following logstash configuration exposes the Beat interface for incoming messages from Filebeat. In order to be properly parsed, the certificate_java_filter plugin must be installed to Logstash.

Certificate parser plugin

The certificate_java_filter plugin is used to consistently parse certificate data of the incoming messages.

To install the plugin, run the following:

/usr/share/logstash/bin/logstash-plugin install --no-verify --local /usr/share/logstash/plugins/logstash-filter-certificate_java_filter-1.0.0.gem
Working with plugins

For more information about how to work with Logstash plugins, refer to Logstash documentation.

Logstash pipeline

Update the configuration based on your environment:

  • ES_HOSTS - list of Elasticsearch hosts to be used for indexing of data
  • CA_CERT - CA certificate file as the list of trusted certificates, if used
  • ES_USER - username of the Elasticsearch user with permissions to index data
  • ES_PASSWORD - password of the Elasticsearch user with permissions to index data
Logstash configuration

The configuration may vary based on your environment and configuration of authentication and authorization. Refer to Logstash configuration for more information.

ejbca-filebeat-pipeline.conf

input {
beats {
host => "0.0.0.0"
port => 5044
}
}

filter {
# if there is no tenant_id in the fields, then drop it at the beginning and do nothing
if ![tenant][id] {
drop { }
}

# add basic information and index
mutate {
add_field => {
"logstash_node" => "lab05.3key.company"
"[@metadata][index]" => "dmr-ejbca-log"
"log_type" => "log"
}

rename => {
"message" => "original_message"
"logsource" => "[hostname]"
}
}

# parse message
grok {
match => { "original_message" => "%{TIMESTAMP_ISO8601:[ejbca][timestamp]}\s+%{LOGLEVEL:[ejbca][loglevel]}\s+\[%{NOTSPACE:[ejbca][logger]}\]\s+\(%{DATA:[ejbca][thread]}\)\s+%{GREEDYDATA:[ejbca][message]}" }
}

# check if there is no error during initial parsing of the message, and if not, continue, otherwise push failed message to elasticsearch
if "_grokparsefailure" not in [tags] {

date {
match => [ "[ejbca][timestamp]", "ISO8601" ]
timezone => "%{[event][timezone]}"
target => "@timestamp"
}

mutate {
gsub => [
# Remove whitespaces from string like "default-threads - 24" if contain dash
"[ejbca][thread]", " - ", "-",
# Replace the spaces with a dash
"[ejbca][thread]", " ", "-"
]
remove_field => [ "original_message", "host", "timestamp" ]
}

################################### BEGIN - EJBCA Audit Messages ###################################
# message like:
# 2018-10-29 16:55:14+01:00;ADMINWEB_ADMINISTRATORLOGGEDIN;SUCCESS;ADMINWEB;EJBCA;CN=Admin
# ;-1537224577;31557AE2AFC1994A;;remoteip=192.168.0.40
#
if [ejbca][message] =~ /^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\+\d{2}:\d{2};/ {
grok {
match => { "[ejbca][message]" => "%{TIMESTAMP_ISO8601:[ejbca][timestamp]};%{DATA:[ejbca][event_type]};%{DATA:[ejbca][event_status]};%{DATA:[ejbca][module]};%{DATA:[ejbca][service]};%{DATA:[ejbca][auth_token]};%{DATA:[ejbca][custom_id]};%{DATA:[ejbca][search_detail_1]};%{DATA:[ejbca][search_detail_2]};%{GREEDYDATA:[ejbca][additional_details]}\n?" }
}

mutate {
replace => { "log_type" => "audit" }
}
}

if [ejbca][additional_details] and [ejbca][event_type]{

kv {
allow_duplicate_values => false
field_split => ";"
source => "[ejbca][additional_details]"
target => "[ejbca][additional]"
transform_key => "lowercase"
}
}
################################### END - EJBCA Audit Messages ###################################

################################### BEGIN - EJBCA Extended Certificate Messages ###################################
if [ejbca][message] =~ /THREEKEY_EXTENDED_PUBLISHER_CERTIFICATE/ {
grok {
pattern_definitions => { "CUSTOMDATE" => "%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{WORD} %{YEAR}" }
match => { "[ejbca][message]" => "%{DATA:[type]}\|%{DATA:[admin]}\|%{DATA:[certificate][base64]}\|%{DATA:[certificate][ca][fingerprint][sha1]}\|%{INT:[certificate][profile][id]}\|%{DATA:[certificate][profile][name]}\|%{INT:[ee][profile][id]}\|%{DATA:[ee][profile][name]}\|%{INT:[certificate][expiredate][long]}\|%{CUSTOMDATE:[certificate][expiredate][string]}\|%{DATA:[certificate][fingerprint][sha1]}\|%{DATA:[certificate][issuerdn]}\|%{INT:[certificate][notbefore][long]}\|%{CUSTOMDATE:[certificate][notbefore][string]}\|%{INT:[certificate][publickey][size]}\|%{INT:[certificate][revocationreason][long]}\|%{DATA:[certificate][revocationreason][string]}\|%{DATA:[certificate][revocationreason][text]}\|%{DATA:[certificate][sn]}\|%{DATA:[certificate][signature][algorithm]}\|%{INT:[certificate][status][long]}\|%{DATA:[certificate][status][string]}\|%{DATA:[certificate][sans]}\|%{DATA:[certificate][subjectdn]}\|%{DATA:[certificate][subjectkeyid][base64]}\|%{INT:[certificate][lastupdate][long]}\|%{CUSTOMDATE:[certificate][lastupdate][string]}\|%{INT:[certificate][type][long]}\|%{DATA:[certificate][type][string]}\|%{INT:[certificate][ca][id]}\|%{DATA:[certificate][ca][name]}\|%{DATA:[username]}\|%{INT:[certificate][revocationdate][long]}\|%{CUSTOMDATE:[certificate][revocationdate][string]}\|%{DATA:[extendedinfo]}\|%{INT:[ee][status][long]}\|%{DATA:[ee][status][string]}\|%{INT:[token][type][long]}\|%{DATA:[token][type][string]}\|%{DATA:[ee][type][long]}\|%{WORD:[ee][type][string]}" }
}

date {
match => [ "[certificate][expiredate][long]", "UNIX_MS" ]
target => "[certificate][expiredate][date]"
}

date {
match => [ "[certificate][notbefore][long]", "UNIX_MS" ]
target => "[certificate][notbefore][date]"
}

date {
match => [ "[certificate][lastupdate][long]", "UNIX_MS" ]
target => "[certificate][lastupdate][date]"
}

date {
match => [ "[certificate][revocationdate][long]", "UNIX_MS" ]
target => "[certificate][revocationdate][date]"
}

if [certificate][san] != "null" {
kv {
field_split => ", "
source => "[certificate][sans]"
target => "[certificate][san]"
}
}

mutate {
replace => { "log_type" => "extended-certificate-log" }
}
}
################################### END - EJBCA Extended Certificate Messages ###################################

################################### BEGIN - EJBCA Extended CRL Messages ###################################
if [ejbca][message] =~ /THREEKEY_EXTENDED_PUBLISHER_CRL/ {
grok {
pattern_definitions => { "CUSTOMDATE" => "%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{WORD} %{YEAR}" }
match => { "[ejbca][message]" => "%{DATA:[type]}\|%{DATA:[admin]}\|%{DATA:[crl][base64]}\|%{DATA:[crl][fingerprint][sha1]}\|%{INT:[crl][delta][indicator]}\|%{INT:[crl][thisupdate][long]}\|%{CUSTOMDATE:[crl][thisupdate][string]}\|%{INT:[crl][nextupdate][long]}\|%{CUSTOMDATE:[crl][nextupdate][string]}\|%{INT:[crl][number]}\|%{DATA:[crl][subjectdn]}\|%{WORD:[crl][fingerprint][sha1]}" }
}

date {
match => [ "[crl][thisupdate][long]", "UNIX_MS" ]
target => "[crl][thisupdate][date]"
}

date {
match => [ "[crl][nextupdate][long]", "UNIX_MS" ]
target => "[crl][nextupdate][date]"
}

mutate {
replace => { "log_type" => "extended-crl-log" }
}
}
################################### END - EJBCA Extended CRL Messages ###################################

################################### BEGIN - EJBCA OCSP Messages ###################################
if [ejbca][logger] == "org.cesecore.certificates.ocsp.logging.TransactionLogger" {
grok {
pattern_definitions => { "CUSTOMTIME" => "%{YEAR}-%{MONTHNUM}-%{MONTHDAY}:%{TIME}:%{WORD}" }
match => { "[ejbca][message]" => "%{DATA:[session][id]};%{INT:[log][id]};%{INT:[status][id]};%{DATA:[client][dn]};%{IP:[client][ip]};%{DATA:[sign][certificate][issuer][dn]};%{DATA:[sign][certificate][subject][name]};%{DATA:[sign][certificate][sn]};%{CUSTOMTIME:[log][time]};%{INT:[reply][time]};%{INT:[process][time]};%{INT:[numberofcertids]};%{DATA:[issuer][dn]};%{DATA:[issuer][hash]};%{DATA:[issuer][key]};%{DATA:[digest][algorithm]};%{DATA:[req][certificate][sn]};%{INT:[req][certificate][status][id]};%{INT:[req][certificate][profile][id]};%{DATA:[forwardedfor][ip]}" }
}

date {
match => [ "[log][time]", "yyyy-MM-dd:HH:mm:ss:z" ]
target => "[log][date]"
}

translate {
field => "[status][id]"
destination => "[status][string]"
dictionary => [
"0", "SUCCESSFUL",
"1", "MALFORMED_REQUEST",
"2", "INTERNAL_ERROR",
"3", "TRY_LATER",
"5", "SIG_REQUIRED",
"6", "UNAUTHORIZED"
]
}

translate {
field => "[req][certificate][status][id]"
destination => "[req][certificate][status][string]"
dictionary => [
"0", "good",
"1", "revoked",
"2", "unknown"
]
}

# check if we have client IP information and parse it as geoipdata
if [client][ip] {
geoip {
database => "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-geoip-6.0.3-java/vendor/GeoLite2-City.mmdb"
source => "[client][ip]"
target => "[client][geoip]"
}
}
if [forwardedfor][ip] {
geoip {
database => "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-geoip-6.0.3-java/vendor/GeoLite2-City.mmdb"
source => "[forwardedfor][ip]"
target => "[forwardedfor][geoip]"
}
}

mutate {
replace => { "log_type" => "ocsp-transaction-log" }
convert => {
"[log][id]" => "integer"
"[status][id]" => "integer"
"[reply][time]" => "integer"
"[process][time]" => "integer"
"[numberofcertids]" => "integer"
"[req][certificate][status][id]" => "integer"
}
}
}

if [ejbca][logger] =~ "org.cesecore.certificates.ocsp.logging.AuditLogger" {
grok {
pattern_definitions => { "CUSTOMTIME" => "%{YEAR}-%{MONTHNUM}-%{MONTHDAY}:%{TIME}:%{WORD}" }
match => { "[ejbca][message]" => "%{DATA:[session][id]};%{INT:[log][id]};%{CUSTOMTIME:[log][time]};%{INT:[reply][time]};%{INT:[process][time]};%{DATA:[ocsp][request]};%{DATA:[ocsp][response]};%{INT:[status][id]}" }
}

date {
match => [ "[log][time]", "yyyy-MM-dd:HH:mm:ss:z" ]
target => "[log][date]"
}

translate {
field => "[status][id]"
destination => "[status][string]"
dictionary => [
"0", "SUCCESSFUL",
"1", "MALFORMED_REQUEST",
"2", "INTERNAL_ERROR",
"3", "TRY_LATER",
"5", "SIG_REQUIRED",
"6", "UNAUTHORIZED"
]
}

mutate {
replace => { "log_type" => "ocsp-audit-log" }
convert => {
"[log][id]" => "integer"
"[status][id]" => "integer"
"[reply][time]" => "integer"
"[process][time]" => "integer"
}
}
}
################################### END - EJBCA OCSP Messages ###################################
}

################################### For all messages ###################################
if [ejbca][additional][cert] {

certificate_java_filter {
source => "[ejbca][additional][cert]"
target => "[ejbca][certificate]"
}

}

}

output {
if [@metadata][_id] {
elasticsearch {
hosts => ["${ES_HOSTS}"]
cacert => "${CA_CERT}"
#ssl_certificate_verification => false
#cacert => "/etc/logstash/elasticsearch-ca.pem"
document_id => "%{[@metadata][_id]}"
index => "%{[@metadata][index]}"

routing => "%{[tenant][id]}"

user => "${ES_USER}"
password => "${ES_PASSWORD}"

#manage_template => false
}
} else {
elasticsearch {
hosts => ["${ES_HOSTS}"]
cacert => "${CA_CERT}"
#ssl_certificate_verification => false
#cacert => "/etc/logstash/elasticsearch-ca.pem"
index => "%{[@metadata][index]}"

routing => "%{[tenant][id]}"

user => "${ES_USER}"
password => "${ES_PASSWORD}"
}
}

if [type] == "THREEKEY_EXTENDED_PUBLISHER_CERTIFICATE" {
elasticsearch {
hosts => ["${ES_HOSTS}"]
cacert => "${CA_CERT}"
document_id => "%{[certificate][fingerprint][sha1]}"
index => "dmr-ejbca-extended-certificate-data"

routing => "%{[tenant][id]}"

user => "${ES_USER}"
password => "${ES_PASSWORD}"
}
}

if [type] == "THREEKEY_EXTENDED_PUBLISHER_CRL" {
elasticsearch {
hosts => ["${ES_HOSTS}"]
cacert => "${CA_CERT}"
document_id => "%{[crl][fingerprint][sha1]}"
index => "dmr-ejbca-extended-crl-data"

routing => "%{[tenant][id]}"

user => "${ES_USER}"
password => "${ES_PASSWORD}"
}
}
}