Daniel Watrous on Software Engineering

A Collection of Software Problems and Solutions

Posts tagged map reduce

Software Engineering

Analyze Tomcat Logs using PIG (hadoop)

In a previous post I illustrated the use of Hadoop to analyze Apache Tomcat log files (catalina.out). Below I perform the same Tomcat log analysis using PIG.

The motivation behind PIG is the ability us a descriptive language to analyze large sets of data rather than writing code to process it, using Java or Python for example. PIG latin is the descriptive query language and has some similarities with SQL. These include grouping and filtering.

Load in the data

First I launch into the interactive local PIG command line, grunt. Commands are not case sensitive, but it can be helpful to distinguish function names from variables. I show all commands in CAPS. Since the catalina.out data is not in a structured format (csv, tab, etc.), I load each line as a chararray (string).

[watrous@myhost ~]$ pig -x local
grunt> raw_log_entries = LOAD '/opt/mount/input/sample/catalina.out' USING TextLoader AS (line:chararray);
grunt> illustrate raw_log_entries;
--------------------------------------------------------------------------------------------------------------------------------------------
| raw_log_entries     | line:chararray                                                                                                     |
--------------------------------------------------------------------------------------------------------------------------------------------
|                     | 2013-10-30 04:20:18,897 DEBUG component.JVFunctions  - JVList: got docc03931336Instant Ink 2 - Alert # q14373261 |
--------------------------------------------------------------------------------------------------------------------------------------------

Note that it is also possible to provide a directory and PIG will load all files in the given directory.

Use regular expressions to parse each line

Now that I have the data in, I want to split each line into fields. To do this in PIG I use regular expressions with the REGEX_EXTRACT_ALL function. Notice that I double escape regex symbols, such as \\s for space. In the command below, the FLATTEN turns the matched values into a tuple that can be matched up with the AS fields. I’m treating all fields as chararray.

grunt> logs_base = FOREACH raw_log_entries GENERATE
>> FLATTEN(
>> REGEX_EXTRACT_ALL(line, '^([0-9]{4}-[0-9]{2}-[0-9]{2}\\s[0-9:,]+)\\s([a-zA-Z]+)\\s+([a-zA-Z0-9.]+)\\s+(.*)$')
>> ) AS (
>> logDate:      chararray,
>> logLevel:     chararray,
>> logClass:     chararray,
>> logMessage:   chararray
>> );
grunt> illustrate logs_base;
-----------------------------------------------------------------------------------------------------------
| raw_log_entries     | line:chararray                                                                    |
-----------------------------------------------------------------------------------------------------------
|                     | 2013-11-08 04:26:27,966 DEBUG component.JVFunctions  - Visible Level Added :LEV1 |
-----------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------
| logs_base     | logDate:chararray       | logLevel:chararray      | logClass:chararray      | logMessage:chararray        |
-----------------------------------------------------------------------------------------------------------------------------
|               | 2013-11-08 04:26:27,966 | DEBUG                   | component.JVFunctions  | - Visible Level Added :LEV1 |
-----------------------------------------------------------------------------------------------------------------------------

Filter and Group and Generate the desired output

I want to report on the ERROR logs by timestamp. I first filter the log base by the logLevel field. I then group the filtered records by logDate. Finally I use the FOREACH function to GENERATE a result set including the timestamp and a count of errors at that time. Finally I dump the results.

grunt> filtered_records = FILTER logs_base BY logLevel == 'ERROR';
grunt> grouped_records = GROUP filtered_records BY logDate;
grunt> log_count = FOREACH grouped_records GENERATE group, COUNT(filtered_records);
grunt> dump log_count
 
HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt      Features
1.0.0   0.12.0  watrous 2013-12-05 21:38:54     2013-12-05 21:39:15     GROUP_BY,FILTER
 
Success!
 
Job Stats (time in seconds):
JobId   Alias   Feature Outputs
job_local_0002  filtered_records,grouped_records,log_count,logs_base,raw_log_entries    GROUP_BY,COMBINER       file:/tmp/temp1196141656/tmp-135873072,
 
Input(s):
Successfully read records from: "/opt/mount/input/sample/catalina.out"
 
Output(s):
Successfully stored records in: "file:/tmp/temp1196141656/tmp-135873072"
 
Job DAG:
job_local_0002
 
 
2013-12-05 21:39:15,813 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
2013-12-05 21:39:15,814 [main] WARN  org.apache.pig.data.SchemaTupleBackend - SchemaTupleBackend has already been initialized
2013-12-05 21:39:15,815 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2013-12-05 21:39:15,815 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
(2013-11-08 04:04:51,894,2)
(2013-11-08 05:04:52,711,2)
(2013-11-08 05:33:23,073,3)
(2013-11-08 06:04:53,689,2)
(2013-11-08 07:04:54,366,3)
(2013-11-08 08:04:55,096,2)
(2013-11-08 13:34:28,936,2)
(2013-11-08 17:32:31,629,3)
(2013-11-08 18:50:56,971,1)
(2013-11-08 18:50:56,980,1)
(2013-11-08 18:50:56,986,1)
(2013-11-08 18:50:57,008,1)
(2013-11-08 18:50:57,017,1)
(2013-11-08 18:50:57,024,1)
(2013-11-08 18:51:17,357,1)
(2013-11-08 18:51:17,423,1)
(2013-11-08 18:51:17,491,1)
(2013-11-08 18:51:17,499,1)
(2013-11-08 18:51:17,500,1)
(2013-11-08 18:51:17,502,1)
(2013-11-08 18:51:17,503,1)
(2013-11-08 18:51:17,504,1)
(2013-11-08 18:51:17,506,1)
(2013-11-08 18:51:17,651,6)
(2013-11-08 18:51:17,652,23)
(2013-11-08 18:51:17,653,25)
(2013-11-08 18:51:17,654,19)
(2013-11-08 19:01:13,771,2)
(2013-11-08 21:32:34,522,2)

Performance in PIG

Performance is at risk, since the descriptive language PIG latin needs to be translated into one or more MapReduce steps. This translation doesn’t always provide for the best performance. However, for smaller datasets, the lower performance may be offset by eliminating the build phase required when producing your own MapReduce jobs.

Troubleshooting

I spent way more time trying to get PIG working than I felt I should have. The PIG mailing list was very helpful and quick. Here are some pointers.

Agreement of Hadoop version

PIG is compiled against a specific version of hadoop. As a result, any local Hadoop version must match the version referenced when PIG was compiled. If the local Hadoop version doesn’t agree with the version used when building PIG, it’s possible to remove all references to the local hadoop version and PIG will use its internal version of hadoop. In my case I had to remove hadoop binaries from my PATH.

Documentation and examples

There are very few examples showing the use of PIG, and of those that I found, none worked as written. This seems to indicate either that PIG is moving very fast or that the developers are unhappy with the APIs, which change frequently.

References

http://aws.amazon.com/articles/2729
Hadoop: The Definitive Guide

Software Engineering

Hadoop Scripts in Python

I read that Hadoop supports scripts written in various languages other than Java, such as Python. Since I’m a fan of python, I wanted to prove this out.

It was my good fortune to find an excellent post by Michael Noll that walked me through the entire process of scripting in Python for Hadoop. It’s an excellent post and worked as written for me in Hadoop 2.2.0.

How hadoop processes scripts from other languages (stdin/stdout)

In order to accommodate scripts from other languages, hadoop focuses on standard in (stdin) and standard out (stdout). As lines are read from the input, they are written to stdout, so that the external script simply monitors stdin and then processes the line. Similarly, the mapper, which reads from stdin then writes to stdout so that the reducer can read from stdin and write again to stdout. Hadoop then consumes the reducer output to write the results. Hopefully this is easy to see in the scripts below.

It should be possible to write similar scripts in any language that can read stdin and write to stdout. There may be some performance implications when choosing another language for processing, but from a developer productivity perspective, a language like python can allow for more rapid development. If you have a process that would benefit from multi-processor use and is CPU bound, it may be worth the extra effort to create in Java.

Revisit Tomcat Logs in Python

I’ll try to add value by revisiting my original example of analyzing Apache Tomcat (Java) logs in Hadoop using python this time.

mapper.py

#!/usr/bin/env python
 
import sys
import re
 
log_pattern = r"""([0-9]{4}-[0-9]{2}-[0-9]{2}\s[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3})\s*([a-zA-Z]+)\s*([a-zA-Z0-9.]+)\s*-\s*(.+)$"""
log_pattern_re = re.compile(log_pattern)
 
# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into segments
    try:
        segments = log_pattern_re.search(line).groups()
    except:
        segments = None
 
    if segments != None and segments[1] == "ERROR":
        # tab delimited indexed by log date (data[0])
        print '%s\t%s' % (segments[0], 1)
    else:
        pass

reducer.py

#!/usr/bin/env python
 
from operator import itemgetter
import sys
 
current_word = None
current_count = 0
key = None
 
# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
 
    # parse the input we got from mapper.py
    key, value = line.split('\t', 1)
 
    # convert value (currently a string) to int
    try:
        value = int(value)
    except ValueError:
        # value was not a number, so silently
        # ignore/discard this line
        continue
 
    # this IF-switch only works because Hadoop sorts map output
    # by key (here: key) before it is passed to the reducer
    if current_word == key:
        current_count += value
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = value
        current_word = key
 
# do not forget to output the last key if needed!
if current_word == key:
    print '%s\t%s' % (current_word, current_count)

Running a streaming hadoop job using python

[watrous@myhost ~]$ hadoop jar ./hadoop-2.2.0/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -file /home/watrous/mapper.py -mapper /home/watrous/mapper.py -file /home/watrous/reducer.py -reducer /home/watrous/reducer.py -input /user/watrous/log_myhost.houston.hp.com/* -output /user/watrous/python_output_myhost.houston.hp.com
packageJobJar: [/home/watrous/mapper.py, /home/watrous/reducer.py] [] /tmp/streamjob6730226310886003770.jar tmpDir=null
13/11/18 19:41:09 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
...
13/11/18 19:41:10 INFO mapred.LocalDistributedCacheManager: Localized file:/home/watrous/mapper.py as file:/tmp/hadoop-watrous/mapred/local/1384803670534/mapper.py
13/11/18 19:41:10 INFO mapred.LocalDistributedCacheManager: Localized file:/home/watrous/reducer.py as file:/tmp/hadoop-watrous/mapred/local/1384803670535/reducer.py
...
13/11/18 19:41:11 INFO streaming.PipeMapRed: PipeMapRed exec [/home/watrous/./mapper.py]
...
13/11/18 19:43:35 INFO streaming.PipeMapRed: PipeMapRed exec [/home/watrous/./reducer.py]
...
13/11/18 19:43:36 INFO mapreduce.Job:  map 100% reduce 100%
13/11/18 19:43:36 INFO mapreduce.Job: Job job_local848014294_0001 completed successfully
13/11/18 19:43:36 INFO mapreduce.Job: Counters: 32
        File System Counters
                FILE: Number of bytes read=291759
                FILE: Number of bytes written=3394292
...

After successfully running the map reduce job, confirm the results are present and view the results.

[watrous@myhost ~]$ hdfs dfs -ls -R /user/watrous/python_output_myhost.houston.hp.com/
13/11/18 19:45:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-rw-r--r--   3 watrous supergroup          0 2013-11-18 19:43 /user/watrous/python_output_myhost.houston.hp.com/_SUCCESS
-rw-r--r--   3 watrous supergroup      10813 2013-11-18 19:43 /user/watrous/python_output_myhost.houston.hp.com/part-00000
[watrous@myhost ~]$ hdfs dfs -cat /user/watrous/python_output_myhost.houston.hp.com/part-00000 | more
13/11/18 19:45:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2013-10-25 06:07:37,108 1
2013-10-25 06:07:37,366 1
2013-10-27 06:08:35,770 1
2013-10-27 06:08:35,777 1
2013-10-27 06:08:35,783 1
2013-10-27 06:09:33,748 11
2013-10-27 06:09:33,749 25
2013-10-27 06:09:33,750 26
2013-10-27 06:09:33,751 14
2013-10-27 07:07:58,383 5
Software Engineering

Use Hadoop to Analyze Java Logs (Tomcat catalina.out)

One of the Java applications I develop deploys in Tomcat and is load-balanced across a couple dozen servers. Each server can produce gigabytes of log output daily due to the high volume. This post demonstrates simple use of hadoop to quickly extract useful and relevant information from catalina.out files using Map Reduce. I followed Hadoop: The Definitive Guide for setup and example code.

Installing Hadoop

Hadoop in standalone mode was the most convenient for initial development of the Map Reduce classes. The following commands were executed on a virtual server running RedHat Enterprise Linux 6.3. First verify Java 6 is installed:

[watrous@myhost ~]$ java -version
java version "1.6.0_24"
OpenJDK Runtime Environment (IcedTea6 1.11.5) (rhel-1.50.1.11.5.el6_3-x86_64)
OpenJDK 64-Bit Server VM (build 20.0-b12, mixed mode)

Next, download and extract Hadoop. Hadoop can be downloaded using a mirror. Hadoop can be setup and run locally and does not require any special privileges. Always verify that you have a good download.

[watrous@myhost ~]$ wget http://download.nextag.com/apache/hadoop/common/stable/hadoop-2.2.0.tar.gz
[watrous@myhost ~]$ md5sum hadoop-2.2.0.tar.gz
25f27eb0b5617e47c032319c0bfd9962  hadoop-2.2.0.tar.gz
[watrous@myhost ~]$ tar xzf hadoop-2.2.0.tar.gz
[watrous@myhost ~]$ hdfs namenode -format

That last command creates an HDFS file system in the tmp folder. In my case it was created here: /tmp/hadoop-watrous/dfs/.

Environment variables were added to .bash_profile for JAVA_HOME and HADOOP_INSTALL, as shown. These can also be run locally each time you login.

export JAVA_HOME=/usr/lib/jvm/jre
export HADOOP_INSTALL=/home/watrous/hadoop-2.2.0
export PATH=$PATH:$HADOOP_INSTALL/bin

I can now verify that Hadoop is installed and ready to run.

[watrous@myhost ~]$ hadoop version
Hadoop 2.2.0
Subversion https://svn.apache.org/repos/asf/hadoop/common -r 1529768
Compiled by hortonmu on 2013-10-07T06:28Z
Compiled with protoc 2.5.0
From source with checksum 79e53ce7994d1628b240f09af91e1af4
This command was run using /home/watrous/hadoop-2.2.0/share/hadoop/common/hadoop-common-2.2.0.jar

Get some seed data

Now that Hadoop is all setup, I need some seed data to operate on. For this I just reached out and grabbed a log file from one of my production servers.

[watrous@myhost ~]$ mkdir input
[watrous@myhost ~]$ scp watrous@mywebhost.com:/var/lib/tomcat/logs/catalina.out ./input/

Creating Map Reduce Classes

The most simple operation in Hadoop requires a Mapper class, a Reducer class and a third class that identifies the Mapper and Reducer including the datatypes that connect them. The examples below required two jars from the release downloaded above:

  • hadoop-2.2.0.tar.gz\hadoop-2.2.0.tar\hadoop-2.2.0\share\hadoop\common\hadoop-common-2.2.0.jar
  • hadoop-2.2.0.tar.gz\hadoop-2.2.0.tar\hadoop-2.2.0\share\hadoop\mapreduce\hadoop-mapreduce-client-core-2.2.0.jar

I also use regular expressions in Java to analyze each line in the log. Regular expressions can be more resilient to variations and allow for grouping, which gives easy access to specific data elements. As always, I used Kodos to develop the regular expression.

In the example below, I don’t actually use the log value, but instead I just count up how many occurrences there are by key.

Mapper class

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
 
public class TomcatLogErrorMapper extends Mapper<LongWritable, Text, Text, Text> {
 
    String pattern = "([0-9]{4}-[0-9]{2}-[0-9]{2}\\s[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3})\\s*([a-zA-Z]+)\\s*([a-zA-Z.]+)\\s*-\\s*(.+)$";
    // Create a Pattern object
    Pattern r = Pattern.compile(pattern);
 
    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
 
        Matcher m = r.matcher(line);
        if (m.find()) {
            // only consider ERRORs for this example
            if (m.group(2).contains("ERROR")) {
                  // example log line
                  // 2013-11-08 04:06:56,586 DEBUG component.helpers.GenericSOAPConnector  - Attempting to connect to: https://remotehost.com/app/rfc/entry/msg_status
//                System.out.println("Found value: " + m.group(0)); //complete line
//                System.out.println("Found value: " + m.group(1)); // date
//                System.out.println("Found value: " + m.group(2)); // log level
//                System.out.println("Found value: " + m.group(3)); // class
//                System.out.println("Found value: " + m.group(4)); // message
                context.write(new Text(m.group(1)), new Text(m.group(2) + m.group(3) + m.group(4)));
            }
        }
    }
}

Reducer class

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
 
public class TomcatLogErrorReducer extends Reducer<Text, Text, Text, IntWritable> {
 
    @Override
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        int countValue = 0;
        for (Text value : values) {
            countValue++;
        }
        context.write(key, new IntWritable(countValue));
    }
}

Job class with main

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
public class TomcatLogError {
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: TomcatLogError <input path> <output path>");
            System.exit(-1);
        }
        Job job = new Job();
        job.setJarByClass(TomcatLogError.class);
        job.setJobName("Tomcat Log Error");
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(TomcatLogErrorMapper.class);
        job.setReducerClass(TomcatLogErrorReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Running Hadoop

In netbeans I made sure that the Main Class was TomcatLogError in the compiled jar. I then ran Clean and Build to get a jar which I transferred up to the server where I installed Hadoop.

[watrous@myhost ~]$ hadoop jar HadoopExample.jar input/catalina.out ~/output
13/11/11 19:20:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/11/11 19:20:52 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
...
13/11/11 18:36:57 INFO mapreduce.Job: Job job_local1725513594_0001 completed successfully
13/11/11 18:36:57 INFO mapreduce.Job: Counters: 27
        File System Counters
                FILE: Number of bytes read=430339145
                FILE: Number of bytes written=1057396
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
        Map-Reduce Framework
                Map input records=1101516
                Map output records=105
                Map output bytes=20648
                Map output materialized bytes=20968
                Input split bytes=396
                Combine input records=0
                Combine output records=0
                Reduce input groups=23
                Reduce shuffle bytes=0
                Reduce input records=105
                Reduce output records=23
                Spilled Records=210
                Shuffled Maps =0
                Failed Shuffles=0
                Merged Map outputs=0
                GC time elapsed (ms)=234
                CPU time spent (ms)=0
                Physical memory (bytes) snapshot=0
                Virtual memory (bytes) snapshot=0
                Total committed heap usage (bytes)=1827143680
        File Input Format Counters
                Bytes Read=114455257
        File Output Format Counters
                Bytes Written=844

The output folder now contains a file named part-r-00000 with the results of the processing.

[watrous@c0003913 ~]$ more output/part-r-00000
2013-11-08 04:04:51,894 2
2013-11-08 05:04:52,711 2
2013-11-08 05:33:23,073 3
2013-11-08 06:04:53,689 2
2013-11-08 07:04:54,366 3
2013-11-08 08:04:55,096 2
2013-11-08 13:34:28,936 2
2013-11-08 17:32:31,629 3
2013-11-08 18:51:17,357 1
2013-11-08 18:51:17,423 1
2013-11-08 18:51:17,491 1
2013-11-08 18:51:17,499 1
2013-11-08 18:51:17,500 1
2013-11-08 18:51:17,502 1
2013-11-08 18:51:17,503 1
2013-11-08 18:51:17,504 1
2013-11-08 18:51:17,506 1
2013-11-08 18:51:17,651 6
2013-11-08 18:51:17,652 23
2013-11-08 18:51:17,653 25
2013-11-08 18:51:17,654 19
2013-11-08 19:01:13,771 2
2013-11-08 21:32:34,522 2

Based on this analysis, there were a number of errors produced around the hour 18:51:17. It is then easy to change the Mapper class to emit based on a different key, such as Class or Message to identify more precisely what the error is, now that I know when the errors happened.

Increasing scale

The Mapper and Reducer classes can be enhanced to give more relevant details. The process of transferring the files can also be automated and the input method can be adapted to walk a directory, rather than a single file. Reports can also be aggregated and placed in a web directory or emailed.

Software Engineering

MongoDB Aggregation for Analytics

I’ve been working on generating analytics based on a collection containing statistical data. My previous attempt involved using Map Reduce in MongoDB. Recall that the data in the statistics collection has this form.

{
        "_id" : ObjectId("5e6877a516832a9c8fe89ca9"),
        "apikey" : "7e78ed1525b7568c2316576f2b265f55e6848b5830db4e6586283",
        "request_date" : ISODate("2013-04-05T06:00:24.006Z"),
        "request_method" : "POST",
        "document" : {
                "domain" : "",
                "validationMethod" : "LICENSE_EXISTS_NOT_EXPIRED",
                "deleted" : null,
                "ipAddress" : "",
                "disposition" : "",
                "owner" : ObjectId("af1459ed793eca35754090a0"),
                "_id" : ObjectId("6fec518787a52a9c988ea683"),
                "issueDate" : ISODate("2013-04-05T06:00:24.005Z"),
        },
        "request_uri" : {
                "path" : "/v1/sitelicenses",
                "netloc" : "api.easysoftwarelicensing.com"
        }
}

There were a few items that kept getting in the way with the map reduce implementation. In particular the complexity of the objects I was trying to emit and then reduce were causing me some headaches. Someone suggested using the Aggregation Framework in MongoDB. Here’s what I came up with.

db.statistics.aggregate({
    $match: {
        owner: ObjectId("5143b2c8136b9616343da222")
    }
}, {
    $project: {
        owner: "$owner",
        action_1: {
            $cond: [{$eq: ["$apikey", null]},0, 1]
        },
        action_2: {
            $cond: [{$ne: ["$apikey", null]},0, 1]
        }
    }
}, {
    $group: {
        _id: "$owner",
        action_1: {$sum: "$action_1"},
        action_2: {$sum: "$action_2"}
    }
}, {
    $project: {
        action_1: "$action_1",
        action_2: "$action_2",
        actions_total: {
            $add: ["$action_1", "$action_2"]
        },
        actions_per_day: {
            $divide: [
                {$add: ["$action_1", "$action_2"]}, 
                {$dayOfMonth: new Date()}
            ]
        },
    }
})

At first all the discussion of the aggregation pipeline felt awkward. After a while it became more clear. For example, the above does this:

  • $match limits me to the set of documents associated with a particular owner
  • $project creates a new document conditionally including some data from the documents that were matched
  • $group then sums those documents that were projected above
  • the final $project performs additional calculations with the grouped (summed) data

The output of the above aggregation is a document that looks like this:

{
        "result" : [
                {
                        "_id" : ObjectId("5136d880136b961c98c9a62f"),
                        "action_1" : 10,
                        "action_2" : 4,
                        "actions_total" : 14,
                        "actions_per_day" : 1.4
                }
        ],
        "ok" : 1
}

So far I’ve only run this on small sets of data, so I can’t comment on performance for large data sets. Since as of right now it’s still not possible to cache the results in a separate collection, performance may become an issue as my statistical data set grows.

Reference

http://stackoverflow.com/questions/16455528/mongodb-aggregation-framework-date-now

Software Engineering

MongoDB Map Reduce for Analytics

I have a RESTful SaaS service I created which uses MongoDB. Each REST call creates a new record in a statistics collection. In order to implement quotas and provide user analytics, I need to process the statistics collection periodically and generate meaningful analytics specific to each user.

This is just the type of problem map reduce was meant to solve. In order to accomplish this I’ll need to do the following:

  • Map all statistics records over a time range
  • Reduce the number of calls, both authenticated and anonymous
  • Finalize to get the sum of authenticated and anonymous calls as total
  • Run over a time range

The data in the statistics collection has this form:

{
        "_id" : ObjectId("5e6877a516832a9c8fe89ca9"),
        "apikey" : "7e78ed1525b7568c2316576f2b265f55e6848b5830db4e6586283",
        "request_date" : ISODate("2013-04-05T06:00:24.006Z"),
        "request_method" : "POST",
        "document" : {
                "domain" : "",
                "validationMethod" : "LICENSE_EXISTS_NOT_EXPIRED",
                "deleted" : null,
                "ipAddress" : "",
                "disposition" : "",
                "owner" : ObjectId("af1459ed793eca35754090a0"),
                "_id" : ObjectId("6fec518787a52a9c988ea683"),
                "issueDate" : ISODate("2013-04-05T06:00:24.005Z"),
        },
        "request_uri" : {
                "path" : "/v1/sitelicenses",
                "netloc" : "api.easysoftwarelicensing.com"
        }
}

Here is what I came up with:

Map function

var map_analytics = function() {
    var key = this.owner;
    if (this.apikey == null) {
        var value = {api_call_with_key: 0, api_call_without_key: 1};
    } else {
        var value = {api_call_with_key: 1, api_call_without_key: 0};
    }
    emit(key, value);
};

Reduce function

var reduce_analytics  = function(key_owner, api_calls) {
    reduced_val = {api_call_with_key: 0, api_call_without_key: 0};
    api_calls.forEach(function(value) {
        reduced_val.api_call_with_key += value.api_call_with_key;
        reduced_val.api_call_without_key += value.api_call_without_key;
    });
    return reduced_val;
};

Finalize function

var finalize_analytics = function (key, reduced_val) {
    reduced_val.total_api_calls = reduced_val.api_call_with_key + reduced_val.api_call_without_key;
    return reduced_val;
};

Run Map Reduce

db.statistics.mapReduce(map_analytics, reduce_analytics, {out: { reduce: "analytics" }, query: { request_date: { $gt: new Date('01/01/2012')}}, finalize: finalize_analytics })

That produces an analytics collection with ObjectIDs that match the users _id in the administrators collection. It looks like this.

> db.statistics.mapReduce(map_analytics, reduce_analytics, {out: { reduce: "analytics" }, query: { request_date: { $gt: new Date('01/01/2012')}}, finalize: finalize_analytics })
{
        "result" : "analytics",
        "timeMillis" : 79,
        "counts" : {
                "input" : 14,
                "emit" : 14,
                "reduce" : 2,
                "output" : 2
        },
        "ok" : 1,
}
> db.analytics.find().pretty()
{
        "_id" : ObjectId("5136d880136b961c98c9a62f"),
        "value" : {
                "api_call_with_key" : 8,
                "api_call_without_key" : 4,
                "total_api_calls" : 12
        }
}
{
        "_id" : ObjectId("5143b2c8136b9616343dacec"),
        "value" : {
                "api_call_with_key" : 0,
                "api_call_without_key" : 2,
                "total_api_calls" : 2
        }
}

I had originally hoped to write the analytics to the administrator document, but I don’t think that’s possible, since it overwrites the document with the result of the reduce/finalize functions.

I got my inspiration from this example.

Storing and Scheduling

The question remains how best to store and then schedule the periodic running of this map reduce functionality. It seems that storing it is best done on the server, as shown here: http://docs.mongodb.org/manual/tutorial/store-javascript-function-on-server/

Scheduling will most likely involve a crontab. I’m not sure if I’ll call it directly or through a python script.