Map/Reduce diff(1)

This has sadly been a draft for years, so time to release it…

diff(1)

For those who use Unix, you have likely come across two files and wanted to see what was different between the two. Certainly, one can compare size (highly inaccurate), use a hash function (if a strong cryptographic hash, it will be accurate — but very information free) or one can use the obvious choice, diff(1). One usually gets output like

$ cat << EOF > one
foo
blah
baz raz
has
EOF
$ cat << EOF > two
blah
yar raz
has
EOF
$ diff one two
1d0
< foo
3c2
< baz raz
---
> yar raz

Here we see that the left file (file one) has an extra entry on line one and line three differs between the two files. Further, we can see that the algorithm matched lines, as blah was matched between the files despite the leading foo in file one.

Map/Reduce

Map/Reduce gained visibility after Google’s initial publication and certainly now that Hadoop has gained significant adoption. For my work, I mostly use Apache Pig which is a high-level language which compiles down to a map/reduce plan and runs on Hadoop Map/Reduce, Apache Tez and Apache Spark.

There are UDF approaches (such as the Pig built-in DIFF). The built-in DIFF does have one flaw for this work, in that it only accepts two bags (non-repeating, unordered data-structure) and as each set of data would be a bag, each file must fit into a container’s memory — not something efficient for differencing two large files.

For implementing code to generate a difference, I settled on two easy ways easy ways to operate. One was a UNION based approach, the other was a JOIN based approach. This allowed me to get the data from each file in one Pig data-structure (a relation), however, the approaches differ dramatically in row size of the relation.

Despite data size differences the run time performance a number of years ago was roughly parallel using Hadoop Map/Reduce. I found on 2012 hardware it took 10 minutes to difference over 200GB (1,055,687,930 rows) using LZO compressed input with 18 nodes. Further, each approach only takes one Map/Reduce cycle.

Also, one has to decide the quality of diff one would like; options range from line-numbers enumerating the records (lines) before the join if one were beginning a context-diff implementation to something as simple as should a match be reported or the count of matches (if a line is duplicated in a single source).

Simply, unlike the Unix diff(1) tool, order is not important; effectively the JOIN approach performs sort -u <foo.txt> | diff while UNION performs sort <foo> | diff.

Implementation

UNION

The UNION operator in Pig is like the SQL UNION operator. For differencing, one only needs to augment each file’s data with the data’s source, group and then count sources to find matches. While more lines of code than a JOIN approach, one can easily add in more metadata to each line (such as if the line is duplicated in each file but of a different quantity of repication).

Code

SET job.name 'Diff(1) Via Join'

-- Erase Outputs
rmf first_only
rmf second_only

-- Process Inputs
a = LOAD 'a.csv.lzo' USING com.twitter.elephantbird.pig.load.LzoPigStorage('\n') AS First: chararray;
b = LOAD 'b.csv.lzo' USING com.twitter.elephantbird.pig.load.LzoPigStorage('\n') AS Second: chararray;

-- Combine Data
combined = JOIN a BY First FULL OUTER, b BY Second;

-- Output Data
SPLIT combined INTO first_raw IF Second IS NULL,
                    second_raw IF First IS NULL;
first_only = FOREACH first_raw GENERATE First;
second_only = FOREACH second_raw GENERATE Second;
STORE first_only INTO 'first_only' USING PigStorage();
STORE second_only INTO 'second_only' USING PigStorage();

 

JOIN

One can perform a difference via an outer-join as well. Here one has a more compact expression to achieve the desired results only doing a FULL OUTER join to only return records (lines) which appear in one file but not the other; then one can return the results to report the asymmetry. The JOIN approach does collapse duplicates (so, if one file has more duplicates than the other, this approach will not output the duplicate).

Code

SET job.name 'Diff(1)'

-- Erase Outputs
rmf first_only
rmf second_only

-- Process Inputs
a_raw = LOAD 'a.csv.lzo' USING com.twitter.elephantbird.pig.load.LzoPigStorage('\n') AS Row: chararray;
b_raw = LOAD 'b.csv.lzo' USING com.twitter.elephantbird.pig.load.LzoPigStorage('\n') AS Row: chararray;

a_tagged = FOREACH a_raw GENERATE Row, (int)1 AS File;
b_tagged = FOREACH b_raw GENERATE Row, (int)2 AS File;

-- Combine Data
combined = UNION a_tagged, b_tagged;
c_group = GROUP combined BY Row;

-- Find Unique Lines
%declare NULL_BAG 'TOBAG(((chararray)\'place_holder\',(int)0))'

counts = FOREACH c_group {
             firsts = FILTER combined BY File == 1;
             seconds = FILTER combined BY File == 2;
             GENERATE
                FLATTEN(
                        (COUNT(firsts) - COUNT(seconds) == (long)0 ? $NULL_BAG :
                            (COUNT(firsts) - COUNT(seconds) > 0 ?
                                TOP((int)(COUNT(firsts) - COUNT(seconds)), 0, firsts) :
                                TOP((int)(COUNT(seconds) - COUNT(firsts)), 0, seconds))
                        )
                ) AS (Row, File); };

-- Output Data
SPLIT counts INTO first_only_raw IF File == 1,
                  second_only_raw IF File == 2;
first_only = FOREACH first_only_raw GENERATE Row;
second_only = FOREACH second_only_raw GENERATE Row;
STORE first_only INTO 'first_only' USING PigStorage();
STORE second_only INTO 'second_only' USING PigStorage();

 

Reference

Leave a Reply