Actor

petr_cermak/datasets-compare

  • Builds
  • latest 0.0.5 / 2018-06-13
  • Created 2018-04-25
  • Last modified 2018-06-13
  • grade 3

Description

Act for comparing crawler execution results. By default the final result set will contain only new and updated records.


API

To run the actor, send a HTTP POST request to:

https://api.apify.com/v2/acts/petr_cermak~datasets-compare/runs?token=<YOUR_API_TOKEN>

The POST payload will be passed as input for the actor. For more information, read the docs.


Example input

Content type: application/json; charset=utf-8

{
  "oldExec": OLD_EXECUTION_ID,
  "newExec": NEW_EXECUTION_ID,
  "idAttr": ID_ATTRIBUTE_NAME,
  "return": WHICH_RECORDS_TO_RETURN,
  "addStatus": ADD_TEXT_STATUS
}

Source code

Based on the apify/actor-node-basic Docker image (see docs).

const Apify = require('apify');
const _ = require('underscore');
const Promise = require('bluebird');

function createKey(result, idAttr){
    return result ? (
        Array.isArray(idAttr) ? 
        idAttr.map(ida => result[ida]).join('_') : 
        result[idAttr]
    ) : null;
}

async function loadResults(datasetId, process, offset){  
    const limit = 10000;
    if(!offset){offset = 0;}
    const newItems = await Apify.client.datasets.getItems({
        datasetId, 
        offset,
        limit
    });
    if(newItems && (newItems.length || (newItems.items && newItems.items.length))){
        if(newItems.length){await process(newItems);}
        else if(newItems.items && newItems.items.length){await process(newItems);}
        await loadItems(datasetId, process, offset + limit);
    }
};

async function createCompareMap(oldExecId, idAttr){
    const data = {};
    let processed = 0;
    console.log('creating comparing map');
    await loadResults(oldExecId, async (fullResults) => {
        const results = _.chain(fullResults.items).flatten().value();
        _.each(results, (result, index) => {
            const key = createKey(result, idAttr);
            if(key){data[key] = result;}
        });
        processed += results.length;
        console.log('processed old results: ' + processed);
    });
    console.log('comparing map created');
    return data;
}

async function compareResults(newExecId, compareMap, idAttr, settings){
    let data = [];
    let processed = 0, pushData = null;
    let newCount = 0, updCount = 0, delCount = 0, uncCount = 0, index = 0;
    
    if(settings.useDataset){
        pushData = async (value, flush) => {
            if(!flush){data.push(value);}
            if(data.length >= 100 || flush){
                await Apify.pushData(data);
                data = [];
            }
        };
    }
    else{pushData = async value => data.push(value);}
    
    console.log('comparing results');
    await loadResults(newExecId, async (fullResults) => {
        const results = _.chain(fullResults.items).flatten().value();
        for(const result of results){
            const id = createKey(result, idAttr);
            if(id){
                const oldResult = compareMap ? compareMap[id] : null;
                if(!oldResult){
                    if(settings.addStatus){result[settings.statusAttr] = 'NEW';}
                    if(settings.returnNew){await pushData(result);}//data.push(result);}
                    newCount++;
                }
                else if(!_.isEqual(result, oldResult)){
                    const addUpdated = async function(changes){
                        if(settings.addStatus){result[settings.statusAttr] = 'UPDATED';}
                        if(settings.returnUpd){
                            if(settings.addChanges){
                                const tChanges = changes || getChangeAttributes(oldResult, result);
                                result[settings.changesAttr] = settings.stringifyChanges ? tChanges.join(', ') : tChanges;
                            }
                            await pushData(result);//data.push(result);
                        }
                        updCount++;
                    }
                    if(settings.updatedIf){
                        const changes = getChangeAttributes(oldResult, result);
                        const intersection = _.intersection(settings.updatedIf, changes);
                        if(!intersection.length){
                            if(settings.addStatus){result[settings.statusAttr] = 'UNCHANGED';}
                            if(settings.returnUnc){await pushData(result);}//data.push(result);}
                            uncCount++;
                        }
                        else{await addUpdated(intersection);}
                    }
                    else{await addUpdated();}
                }
                else{
                    if(settings.addStatus){result[settings.statusAttr] = 'UNCHANGED';}
                    if(settings.returnUnc){await pushData(result);}//data.push(result);}
                    uncCount++;
                }
                if(compareMap){delete compareMap[id];}
            }
            else{console.log('record is missing id (' + idAttr + '): ' + JSON.stringify(result));}
        }
        processed += results.length;
        console.log('compared new results: ' + processed);
    });
    console.log('comparing results finished');
    
    if(compareMap && settings.returnDel){
        console.log('processing deleted results');
        const values = Object.values(compareMap);
        for(const oldResult of values){
            if(settings.addStatus){oldResult[settings.statusAttr] = 'DELETED';}
            await pushData(oldResult);//data.push(oldResult);
            delCount++;
        }
        console.log('processing deleted results finished');
    }
    
    console.log('new: ' + newCount + ', updated: ' + updCount + 
                (settings.returnDel ? (', deleted: ' + delCount) : '') + 
                ', unchanged: ' + uncCount);
    if(!settings.useDataset){return data;}
    else{pushData(null, true);}
}

function getChangeAttributes(obj1, obj2, prefix, out){
    const changes = out ? out : [];
    if(obj1){
        for(const key in obj1){
            const v1 = obj1[key];
            const v2 = obj2 ? obj2[key] : null;
            if(!_.isEqual(v1, v2)){
                if(v1 !== null && typeof v1 === 'object'){
                    getChangeAttributes(v1, v2, key + '/', changes);
                }
                else{changes.push(prefix ? prefix + key : key);}
            }
        }
    }
    return changes;
}

Apify.main(async () => {
    const input = await Apify.getValue('INPUT');
    
    const data = input.data ? (typeof input.data === 'string' ? JSON.parse(input.data) : input.data) : input;
    if(!data.idAttr){
        return console.log('missing "idAttr" attribute in INPUT');
    }
    if(!data.oldDataset){
        return console.log('missing "oldDataset" attribute in INPUT');
    }
    if(!data.newDataset){
        return console.log('missing "newDataset" attribute in INPUT');
    }
    
    if(data.token){Apify.client.setOptions({token: data.token});}
    if(data.userId){Apify.client.setOptions({userId: data.userId});}
    
    const settings = {};
    data.return = data.return || 'new, updated';
    settings.returnNew = data.return.match(/new/i);
    settings.returnUpd = data.return.match(/updated/i);
    settings.returnDel = data.return.match(/deleted/i);
    settings.returnUnc = data.return.match(/unchanged/i);
    settings.addStatus = data.addStatus ? true : false;
    settings.addChanges = data.addChanges ? true : false;
    settings.statusAttr = data.statusAttr ? data.statusAttr : 'status';
    settings.changesAttr = data.changesAttr ? data.changesAttr : 'changes';
    settings.stringifyChanges = data.stringifyChanges;
    settings.updatedIf = data.updatedIf;
    settings.useDataset = data.useDataset;
    
    const compareMap = data.oldDataset ? (await createCompareMap(data.oldDataset, data.idAttr)) : null;
    const resultData = await compareResults(input._id || data.newDataset, compareMap, data.idAttr, settings);
    
    if(resultData){await Apify.setValue('OUTPUT', resultData);}
    console.log('finished');
});