-- Register Advance Matching Module[AMM] Hive UDF jar
ADD JAR <Directory path>/amm.hive.${project.version}.jar;
-- Provide alias to UDF class (optional). String in quotes represent class names needed for this job to run.
CREATE TEMPORARY FUNCTION rowid as 'com.pb.bdq.hive.common.RowIDGeneratorUDF';
-- This rowid is needed by Best of Breed to maintain the order of rows while creating groups. This is a UDF (User Defined Function) and associates an incremental unique integer number to each row of the data.
CREATE TEMPORARY FUNCTION bestofbreed as 'com.pb.bdq.amm.process.hive.consolidation.bestofbreed.BestOfBreedUDAF';
-- Best of Breed is implemented as a UDAF (User Defined Aggregation function). It processes one group of rows at a time and generates the result for that group of rows.
-- Disable map side aggregation
set hive.map.aggr = false;
-- Set the rule using configuration property 'pb.bdq.consolidation.rule'
set pb.bdq.consolidation.rule={"consolidationConditions":[
{"consolidationRule":{"conditionClass":"conjoinedRule", "joinType":"AND",
"consolidationRules":[{"conditionClass":"simpleRule", "operation":"LONGEST", "fieldName":"c5", "value":null, "valueNumeric":true, "valueFromField":false},
{"conditionClass":"simpleRule", "operation":"IS_NOT_EMPTY", "fieldName":"c9", "value":null, "valueNumeric":false, "valueFromField":false}]},
"actions":[{"accumulate":false, "copyFromField":true, "sourceData":"c2", "destinationFieldName":"c2"},
{"accumulate":false, "copyFromField":false, "sourceData":"Admin", "destinationFieldName":"c4"}]},
{"consolidationRule":{"conditionClass":"conjoinedRule", "joinType":"AND",
"consolidationRules":[{"conditionClass":"simpleRule", "operation":"LONGEST", "fieldName":"c5", "value":null, "valueNumeric":true, "valueFromField":false},
{"conditionClass":"simpleRule", "operation":"IS_NOT_EMPTY", "fieldName":"c9", "value":null, "valueNumeric":false, "valueFromField":false}]},
"actions":[{"accumulate":false, "copyFromField":false, "sourceData":"Changed", "destinationFieldName":"c10"},
{"accumulate":false, "copyFromField":true, "sourceData":"c5", "destinationFieldName":"c6"},
{"accumulate":true, "copyFromField":true, "sourceData":"c10", "destinationFieldName":"c10"}]}],
"keepOriginalRecords":true, "buildTemplateRecord":true,
"templateRules":[{"consolidationRule":{"conditionClass":"conjoinedRule", "joinType":"OR",
"consolidationRules":[{"conditionClass":"simpleRule", "operation":"CONTAINS", "fieldName":"c1", "value":"li", "valueNumeric":false, "valueFromField":false},
{"conditionClass":"simpleRule", "operation":"LONGEST", "fieldName":"c5", "value":null, "valueNumeric":false, "valueFromField":false}]}, "actions":[]}]};
-- Set header (along with the id field alias used in the query) using configuration property 'pb.bdq.consolidation.header'
set pb.bdq.consolidation.header=c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,id;
-- Set sort field name to the alias used in the query, using the configuration property 'pb.bdq.consolidation.sort.field'
set pb.bdq.consolidation.sort.field=id;
-- Execute Query on the desired table. The query uses a UDF rowid, which must be present in the query to maintain the ordering of the data while reading.
-- Best of Breed returns a list of map containing <key=value> pairs. Each map in the list corresponds to a row in the group. The below query explodes that list of map and fetches fields from map by keys.
SELECT tmp2.record["c1"],
tmp2.record["c2"],
tmp2.record["c3"],
tmp2.record["c4"],
tmp2.record["c5"],
tmp2.record["c6"],
tmp2.record["c7"],
tmp2.record["c8"],
tmp2.record["c9"],
tmp2.record["c10"],
tmp2.record["CollectionRecordType"]
FROM (
SELECT bestofbreed(innerRowID.c1,
innerRowID.c2,
innerRowID.c3,
innerRowID.c4,
innerRowID.c5,
innerRowID.c6,
innerRowID.c7,
innerRowID.c8,
innerRowID.c9,
innerRowID.c10,
innerRowID.id) AS matchgroup
FROM(
SELECT c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, rowid(*) AS id FROM databob
) innerRowID
GROUP BY c3
) AS innerResult
LATERAL VIEW explode(innerResult.matchgroup) tmp2 AS record ;
-- Query to dump the output to a file
INSERT OVERWRITE LOCAL DIRECTORY '/home/hadoop/bestofbreed/' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' collection items terminated by '||' map keys terminated by ':'
SELECT tmp2.record["c1"],
tmp2.record["c2"],
tmp2.record["c3"],
tmp2.record["c4"],
tmp2.record["c5"],
tmp2.record["c6"],
tmp2.record["c7"],
tmp2.record["c8"],
tmp2.record["c9"],
tmp2.record["c10"],
tmp2.record["CollectionRecordType"]
FROM (
SELECT bestofbreed(innerRowID.c1,
innerRowID.c2,
innerRowID.c3,
innerRowID.c4,
innerRowID.c5,
innerRowID.c6,
innerRowID.c7,
innerRowID.c8,
innerRowID.c9,
innerRowID.c10,
innerRowID.id) as matchgroup
FROM(
SELECT c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, rowid(*) AS id FROM databob
) innerRowID
GROUP BY c3
) AS innerResult
LATERAL VIEW explode(innerResult.matchgroup) tmp2 AS record ;
--sample input data
--+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
--| c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 | c9 | c10 |
--+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
--| Duplicate| 87 | 1 | |ANNA ABNEY| ANNA | | ABNEY | A | 18 |
--| Duplicate| 77 | 1 | |ANNA A ANN| ANDREA | | ANNAKAY | A | 196 |
--+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
--sample output data
--+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------------------+
--| c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 | c9 | c10 |CollectionRecordType|
--+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------------------+
--| Duplicate| 87 | 1 | |ANNA ABNEY| ANNA | | ABNEY | A | 18 | Primary |
--| Duplicate| 77 | 1 | |ANNA A ANN| ANDREA | ARANOW | ANNAKAY | A | 196 | Secondary |
--| Duplicate| 87 | 1 | |ANNA ABNEY| ANNA | ARANOW | ABNEY | A | 18 | BestOfBreed |
--+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------------------+