Actor

jaroslavhejlek/bf-upload-to-mysql

  • Builds
  • latest 0.0.46 / 2018-01-09
  • Created 2017-11-23
  • Last modified 2018-01-09
  • grade 7

Description

Loads data from crawler executions and uploads them directly to mysql.


API

To run the actor, send a HTTP POST request to:

https://api.apify.com/v2/acts/jaroslavhejlek~bf-upload-to-mysql/runs?token=<YOUR_API_TOKEN>

The POST payload will be passed as input for the actor. For more information, read the docs.


Example input

Content type: application/json; charset=utf-8

{ 
    "CRAWLER_AUTHOR_ID": "n4erFvw2ob2h43wZs",
    "CRAWLER_ID": "FhGHzL2gaxxafZrgP",
    "IS_BLACK_FRIDAY_CRAWLER": false,
    "MAX_INSERT_LINES": 3000,
    "MYSQL_TABLE": "data",
    "TABLE_DEFAULTS": {
      "date": null,
      "eshop": null,
      "executionId": null,
      "itemUrl": null,
      "itemName": null,
      "discountedName": null,
      "currentPrice": null,
      "originalPrice": null,
      "isBlackFridayDeal": 0,
      "url": null,
      "img": null,
    }
}

Source code

Based on the apify/actor-node-basic Docker image (see docs).

const Apify = require('apify');
const request = require('request-promise');
const _ = require('underscore');
const mysql = require('mysql');
const Promise = require('bluebird');

// Mysql and credentials are stored in secured environment variables
const MYSQL_CREDENTIALS = {
    host: process.env.MYSQL_HOST,
    user: process.env.MYSQL_USER,
    database: process.env.MYSQL_DATABASE,
    password: process.env.MYSQL_PASSWORD,
};
const TOKEN = process.env.TOKEN;

const client = Apify.client.crawlers;

// Global variables loaded from INPUT when act starts
let mysqlTable;
let maxInsertLines;
let defaultValues;
let keys;
let allKeys;
let crawlerId;
let userId;
let isCrawlerForBlackFridayCategory;
let eshop;


// Statistics
let totalItems = 0;
let totalExecs = 0;


// Mysql connection and global query function
let connection;
let queryPromised;


/**
 * Creates connection to mysql server
 * @return {Promise} Promise of successful connection
 */
const connect = () => {
    Promise
        .resolve()
        .then(() => {
            connection = mysql.createConnection(MYSQL_CREDENTIALS);
            queryPromised = Promise.promisify(connection.query, { context: connection });

            return Promise.promisify(connection.connect, { context: connection })();
        });
}

/**
 * Serializes promises and ensures that promises are fullfilled in correct order
 * @param  {[type]} promises [description]
 * @return {[type]}          [description]
 */
const sequentializePromises = (promises) => {
    const results = [];

    if (!promises.length) return Promise.resolve([]);

    const firstPromise = promises.shift();

    return promises
        .reduce((prev, next) => {
            return prev.then((data) => {
                results.push(data);

                return next.then ? next : next();
            });
        }, firstPromise.then ? firstPromise : firstPromise())
        .then((data) => {
            results.push(data);

            return results;
        });
};


/**
 * Inserts data from single crawler execution into mysql table
 * @param  {String} execution id of the execution
 * @return {Promise}          Promise of finished execution
 */
const insertExecutionToMysql = async (execution) => {
    console.log('loading results for execution', execution._id);
    return client
        .getExecutionResults({
            executionId: execution._id,
            simplified: 1,
            offset: 0,
            limit:2000
        })
        .then((data) => {
            console.log('Loaded results for execution', execution._id);
            totalItems += data.items.length;
            totalExecs += 1;
            
            const items = data.items
                // Cleanup results, remove results that does not have product name, or have strings as prices
                .filter((item) => item.itemName && !isNaN(item.currentPrice || 0) && !isNaN(item.originalPrice || 0) && item.url.indexOf('-na-splatky') === -1)
                // map values to table definition
                .map((item) => {
                    item.date = execution.startedAt;
                    item.eshop = eshop;
                    item.executionId = execution._id;
                    item.isBlackFridayDeal = isCrawlerForBlackFridayCategory ? 1 : 0;
                    
                    _.defaults(allKeys, item);
                    
                    return _.values(Object.assign({}, defaultValues, _.pick(item, keys)));
                })
                .slice(0, 20000);
            
            if (items.length === 0) {
                console.log(`${execution._id} => EMPTY!`);
                return;
            }

            // create chunks
            const chunks = _
                .chain(items)
                .groupBy(function(element, index){
                    return Math.floor(index / maxInsertLines);
                })
                .toArray()
                .value();

            console.log(`${execution._id} => ${items.length} items ${chunks.length} chunks`);
            
            // generate promises for queries
            let done = 0;
            const promiseGenerators = chunks.map((chunk) => {
                return queryPromised(`INSERT INTO ${mysqlTable} (${keys.join(',')}) VALUES ?`, [chunk])
                    .then(() => {
                        done ++;
                        console.log(`${done}/${chunks.length} chunk`);
                    });
            });

            // start execution of promises serialy
            return sequentializePromises(promiseGenerators);
        });
};

/**
 * Processes all executions for provided crawlerId
 * @param  {Array<String>} previouslyParsedExecutions Array of ids of executions already stored into mysql
 * @return {Promise} Promise of successul storage into mysql
 */
const getExecutions = async (previouslyParsedExecutions) => {
    let done = 0;
    
    // get all executions for crawler id
    const executions = await client.getListOfExecutions({
        crawlerId,
        userId,
        token: TOKEN,
    });
    
    // Helper which ensures that execution is attempted again if something fails
    const processExecution = (execution) => {
        return insertExecutionToMysql(execution)
            .then(() => {
                done ++;
                console.log(`${done}/${executions.items.length} ${execution._id}`); 
            })
            .catch((err) => {
                console.error(err.message);
                console.log('Error reconnecting and repeating request ...');
                
                return (new Promise((resolve) => setTimeout(resolve, 1000)))
                    .then(() => connect())
                    .then(() => processExecution(execution));
            });
    };
    
    // Process all found executions
    const promiseGenerators = executions.items
      .filter((execution) => execution.status === 'SUCCEEDED')
      .filter((execution) => !previouslyParsedExecutions.includes(execution._id))
      .map((execution) => {
        return () => processExecution(execution);
    });
        
    // ensure that all executions are processed in correct order
    return sequentializePromises(promiseGenerators);
};

Apify.main(async () => {
    await connect();
    
    // get settings from input
    const input = await Apify.getValue('INPUT');
    
    console.log(input);

    mysqlTable = input.MYSQL_TABLE;
    maxInsertLines = input.MAX_INSERT_LINES;
    defaultValues = input.TABLE_DEFAULTS;
    keys = _.keys(defaultValues);
    allKeys = {};
    crawlerId = input.CRAWLER_ID;
    userId = input.CRAWLER_AUTHOR_ID;
    eshop = input.ESHOP;
    isCrawlerForBlackFridayCategory = input.IS_BLACK_FRIDAY_CRAWLER;

    // Find all previously parsed executions, this ensures that we don't store same execution twice.
    let previouslyParsedExecutions = await queryPromised(`SELECT DISTINCT executionId FROM ${mysqlTable};`);
    previouslyParsedExecutions = previouslyParsedExecutions.map(data => data.executionId);
    
    // process executions
    await getExecutions(previouslyParsedExecutions);
    
    // log statistics when everything is finished
    console.log('Executions total:');
    console.log(totalExecs);
    console.log('Lines total:');
    console.log(totalItems);
});