How-to: Do Real-Time Log Analytics with Apache Kafka, Cloudera Search, and Hue
Cloudera recently . This simple use case illustrates how to make web log analysis, powered in part by Kafka, one of your first steps in a pervasive analytics journey.
If you are not looking at your company’s operational logs, then you are at a competitive disadvantage in your industry. Web server logs, application logs, and system logs are all valuable sources of operational intelligence, uncovering potential revenue opportunities and helping drive down the bottom line. Whether your firm is an advertising agency that analyzes clickstream logs for customer insight, or you are responsible for protecting the firm’s information assets by , you should strive to get the most value from your data as soon as possible.
In the past, it is cost-prohibitive to capture all logs, let alone implement systems that act on them intelligently in real time. Recently, however, technology has matured quite a bit and, today, we have all the right ingredients we need in the Apache Hadoop ecosystem to capture the events in real time, process them, and make intelligent decisions based on that information.
In this post, you will explore a sample implementation of a system that can capture Apache HTTP Server logs in real time, index them for searching, and make them available to other analytic apps as part of a “pervasive analytics” approach. This implementation is based on open source components such as Apache Flume, Apache Kafka, Hue, and Apache Solr.
Flume, Solr, Hue, and Kafka can all be easily installed using Cloudera Manager and parcels (the first three via the CDH parcel, and Kafka via its own parcel).
Architecture
The high-level diagram below illustrates a simple setup that you can deploy in a matter of minutes. For our purposes, Apache web server log events originate in syslog. They are then forwarded to a Flume Agent, via Flume Syslog Source. Syslog Source sends them to Kafka Channel, which in turn passes them to a MorphlineSolr
sink. MorphlineSink
parses the messages, converts them into Solr documents, and sends them to Solr Server. After the indexed documents appear in Solr, Hue’s Search Application is utilized to search the indexes and build and display multiple unique dashboards for various audiences.
Next, you will learn all the details behind the above.
Apache Logs Breakdown
Every time you start a new project that involves Solr, you must first understand your data and organize it into fields. Fortunately, Apache web server logs are easy enough to understand and relate to Solr documents. A sample of the logs can be found below:
1
2
3
|
122.248.234.23 - - [15/Dec/2014:06:39:51 +0000] "GET /accounts/login/?next=/ HTTP/1.1" 302 460 "-" "Mozilla/5.0+(compatible; UptimeRobot/2.0; http://www.uptimerobot.com/)" 55006
170.248.46.113 - - [15/Dec/2014:06:39:54 +0000] "GET /pig/watch/0000365-141119075018336-oozie-oozi-W?format=python&_=1418625519197 HTTP/1.1" 200 719 "http://demo.gethue.com/pig/#logs" "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)" 37789
170.248.46.113 - - [15/Dec/2014:06:39:55 +0000] "GET /pig/watch/0000365-141119075018336-oozie-oozi-W?format=python&_=1418625519198 HTTP/1.1" 200 719 "http://demo.gethue.com/pig/#logs" "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)" 28120
|
The diagram below represents a simple view of how to organize raw Apache web server messages into Solr fields:
There’s Something About the Cloud
Cloudera Search, which is integrates Solr with HDFS, is deployed in SolrCloud mode with all the options and flexibility that come with integrating to the rest of the Hadoop ecosystem in CDH. Throughout this example, you will use the solrctl command to manage SolrCloud deployments. (For the full command reference, please click .)
Let’s begin by generating template configuration files for Solr. The most important and the only file to update is schema.xml
, which is used in Solr to define the fields in the collection, their types, and their indexing characteristics. The command below generates conf directory with all configuration files in the $HOME/accessCollection
folder:
1
|
solrctl --zk localhost:2181/solr instancedir --generate $HOME/accessCollection
|
(Please note that –zk localhost:2181
should be replaced with the address and port of your own Apache ZooKeeper quorum.)
Next, edit the schema.xml file (which can be found ). What follows is a brief overview of what was changed from the template generated above. The fields relevant to the Apache logs have to be defined in the schema file:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
<field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" />
<field name="_version_" type="long" indexed="true" stored="true"/>
<field name="time" type="tdate" indexed="true" stored="true" />
<field name="record" type="text_general" indexed="true" stored="false" multiValued="true"/>
<field name="client_ip" type="string" indexed="true" stored="true" />
<field name="code" type="string" indexed="true" stored="true" />
<field name="user_agent" type="string" indexed="true" stored="true" />
<field name="protocol" type="string" indexed="true" stored="true" />
<field name="url" type="string" indexed="true" stored="true" />
<field name="request" type="string" indexed="true" stored="true" />
<field name="referer" type="string" indexed="true" stored="true" />
<field name="bytes" type="tint" indexed="true" stored="true" />
<field name="method" type="string" indexed="true" stored="true" />
<field name="extension" type="string" indexed="true" stored="true" />
<field name="app" type="string" indexed="true" stored="true" />
<field name="subapp" type="string" indexed="true" stored="true" />
<field name="device_family" type="string" indexed="true" stored="true" />
<field name="user_agent_major" type="string" indexed="true" stored="true" />
<field name="user_agent_family" type="string" indexed="true" stored="true" />
<field name="os_family" type="string" indexed="true" stored="true" />
<field name="os_major" type="string" indexed="true" stored="true" />
<field name="region_code" type="string" indexed="true" stored="true" />
<field name="country_code" type="string" indexed="true" stored="true" />
<field name="city" type="string" indexed="true" stored="true" />
<field name="latitude" type="float" indexed="true" stored="true" />
<field name="longitude" type="float" indexed="true" stored="true" />
<field name="country_name" type="string" indexed="true" stored="true" />
<field name="country_code3" type="string" indexed="true" stored="true" />
|
Although you are not using the id
and _version_
fields in this application, Solr uses them internally for its own bookkeeping. Therefore, every collection must have them (as defined in the schema.xml
file).
One very important concept in SolrCloud deployments is the notion of collections. A collection is a single index that spans multiple Solr Instances. For example, if your syslog index is distributed across multiple Solr Instances, they all add up to form one collection.
Let’s call our collection accessCollection and set it up using the commands below. The first command uploads all of the configurations into a ZooKeeper znode. The second command creates the collection in Solr, based on the configuration in ZooKeeper from the first command.
1
2
3
|
solrctl --zk localhost:2181/solr instancedir --create accessCollection $HOME/accessCollection
solrctl --zk localhost:2181/solr collection --create accessCollection -s 1
|
Again, replace –zk localhost:2181
with your own ZooKeeper quorum configuration in both statements.
Note that the -s 1
argument defines the number of shards. A shard is a very important concept in Solr that refers to a slice of an index. For example, if you have a corpus of 1 million events, you may want to split it into two shards for scalability and improved query performance. The first shard might handle all the documents that have an id between 0-500,000, and the second shard will handle documents with message id between 500,000-1,000,000. Solr handles all this logic internally; you need only specify the number of shards you would like to create with the -s
option.
The number of shards will depend on many factors and should be determined carefully. The following table describes some of the considerations that should go into choosing the optimal number:
How Flume Met Kafka
Before you index the logs for searching, you need to collect them from the application servers.
Flume is a distributed system for collecting and processing log data. One of the main advantages of Flume is its large collection of sources and sinks. In many cases, Flume makes integration a no-brainer.
As previously described, our example uses Flume with Syslog Source to collect the log data from syslog, Kafka as a distributed and highly available channel to store the log data, and Solr sink with Morphlines to index the data and store it in Cloudera Search. All this can be done by properly configuring Flume, without writing a line of code. You can find the configuration file .
There are three components in the configuration:
- First, a syslog source, configured with the host and port to which it will bind.
1
2
3
4
5
6
|
# Syslog Source Configuration
tier1.sources.source1.type = syslogtcp
# the hostname that Flume Syslog source will be running on
tier1.sources.source1.host = localhost
# the port that Flume Syslog source will listen on
tier1.sources.source1.port = 5040
|
- Next, a Solr sink, configured with a configuration file that we’ll review in detail later.
1
2
|
tier1.sinks.sink1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
tier1.sinks.sink1.morphlineFile = /apache_logs/latest/morphline.conf
|
- Finally, a Kafka channel in between them.
1
2
3
4
5
|
tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.channel1.transactionCapacity = 1000
tier1.channels.channel1.brokerList = kafkaf-2:9092,kafkaf-3:9092
tier1.channels.channel1.topic = channel1
tier1.channels.channel1.zookeeperConnect = kafkaf-1:2181
|
The Kafka channel requires two mandatory parameters:
- Location of at least one, but preferably two or more Kafka brokers
- Location of the ZooKeeper quorum that Kafka uses
There are also a few optional parameters:
topic
– specifies which topic the channel will use. It’s important to set it correctly if you expect Flume to read data that other apps wrote to Kafka; the topic should match between the apps and the Kafka Channel configuration. The default topic is flume-channel.groupId
– if multiple Kafka channels share the samegroupId
and same topic, they will each get partial data from the topic. Thus you can use this setting to add scalability via multiple Flume agents, each with a Kafka channel configured with the samegroupId
. Or, if you need multiple channels that all receive all the data from Kafka (essentially duplicating all the data), you’ll want to use differentgroupId
s or different topics. The defaultgroupId
isflume
.transactionCapacity
– the number of events the channel processes in one transaction. Setting this parameter to a higher number can increase throughput but latency as well.parseAsFlumeEvent
– A setting of “true” assumes that all events in the Kafka topic were written by a Flume source or Flume client. Thus the first time the channel starts, all events in the topic are read (subsequently, only the last recorded position is read). A setting of “false” assumes that some other application wrote the events to Kafka so thus they’re not parsed. In addition, only events written after the channel started are read (since the topic may have a large history in it already).
All this is nice if the data is arriving from syslog and going only to Solr by way of Morphlines, but in today’s enterprise IT, there are usually many different data sources. In many companies, applications write important events directly to Kafka without going through syslog or Log4J at all.
To get data from Kafka, parse it with Morphlines, and index it into Solr, you can use an almost identical configuration. The only changes required are:
- Leave out the Syslog source.
- When configuring the Kafka channel, specify:
parseAsFlumeEvent = false
.
These changes are necessary because the events are now written to Kafka by apps other than Flume, so the source is not necessary (Kafka channel will get events from Kafka to SolrSink) and the events in the channel can be any data type, not necessarily a FlumeEvent
.
This configuration allows indexing and searching using Cloudera Search any enterprise event that was written to Kafka (including logs, metrics, audit events, and so on).
ETL Coding with Kite Morphlines
A “morphline” is a rich configuration file that makes it easy to define a transformation chain that can consume any kind of data from any kind of data source, process that data, and load the results into a Hadoop component. Apache log parsing is achieved with the help of the , an open source framework available through the , that defines a transformation chain without a single line of code.
Our will break down raw apache logs and generate Solr fields that will be used for indexing. The morphlines library will perform the following actions:
- Read the logs with the
readCSV
command, using space as a separator - Use the
split
command to break up request field into three parts:method
,url
,protocol
- Use the
split
command to extractapp
andsubapp
fields from theurl
field - Use the
userAgent
command to extract all of the device, OS, and user agent information - Use the
geoIP
andextractJsonPaths
commands to retrieve geo coordinates such as country, region, city, latitude, and longitude by doing a lookup against an efficient in-memory Maxmind database. Therefore, the databases need to be from Maxmind - Generate unique ID for every log with the
generateUUID
command - Convert the date/timestamp into a field that Solr will understand, with the
convertTimestamp
command - Drop all of the extra fields that we did not specify in
schema.xml
, with thesanitizeUknownSolrFields
command, and - Load the record into Solr for HDFS write, with the
loadSolr
command
When building this example, we initially used three morphlines commands to break up the Apache log event: readCSV
, split
, split
. Our intention was to make this blog more generic and demonstrate how easy it can be adapted to all different types of logs. However, the creators of the morphlines library have generously provided a number of pre-defined patterns for , including Apache web server ones. What follows is an alternative way of reading the Apache log events and breaking them up into fields via morphlines:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
{
readLine {
ignoreFirstLine : true
commentPrefix : "#"
charset : UTF-8
}
}
{
grok {
dictionaryFiles : [target/test-classes/grok-dictionaries]
expressions : {
message : """<%{COMBINEDAPACHELOG:apache_log}>"""
}
extract : inplace
findSubstrings : false
addEmptyStrings : false
}
}
|
Picture Perfect with Hue Dashboards
Now that your logs are indexed in near real time, you need a dashboard to search and drill into the events. The best tool for this job is Hue, the open source GUI for Hadoop, which comes preloaded with a Search application.
With just a few simple clicks, we can generate a nice dashboard, and present it to the end user.
Start by clicking the Search->Indexes menu item, then click on Dashboards and Create. You will then see a new dashboard template window such as the one below; to get going, click the little pencil (Edit) button.
Next, choose how to present the search results on the dashboard. For our demo, we chose Grid Layout, but if you are handy with HTML you can chose an HTML layout and present the results in a sexier manner.
The next step is where all fun begins: You can drag and drop different widgets on the screen and assign them to the fields in the index. At the moment, Hue Search Dashboards support the following widgets:
- Filter Bar
- Marker Map
- Text Facet
- Pie Chart
- Bar Chart
- Line Chart
- Tree
- Heatmap
- Timeline
- Gradient Map
For full reference of how to build your own dashboards, follow the links below:
For our Apache logs demo, we used pie charts to give users the ability to drill into Application, Region, and Operating System facets. Text facets allow users to drill into country and city. A timeline view provides a nice graphical view of when users accessed our website. Finally, a marker map visually displays geo locations from which users accessed our example website.
Conclusion
Although the main example in this post describes a use case involving Apache web server logs, you could just easily use the same components for any type of log/event processing. For an information security use case, processing proxy and firewall logs in real time can go a long way toward stopping external attacks and preventing insider threats. For an insurance company, processing claims and making them searchable to adjusters and fraud analysts can decrease time to resolution.
Whatever the use case, the ongoing investment in pervasive analytics is key.
Gwen Shapira is a Software Engineer at Cloudera, working on the Data Ingest team.
Jeff Shmain is a Solutions Architect at Cloudera.