Act

drobnikj/executions-queue

  • Builds
  • latest 0.0.17 / 2018-01-23
  • Created 2017-11-20
  • Last modified 2018-01-23
  • grade 2

Description

This act process queue of executions. If the act is finished it send mail with queue state. input attributes: - action (String) - use value "startQueue" to start queue - maxParallelsExecutions (Number) - how many executions can run in parallels - finishedHookMailTo (String|Array) - mail(s) where act sends notification when finish - finishWebhookUrl (String) - Url where act sends POST request when finished., - finishWebhookData (Object) - Data which sends act to finishWebhookUrl with queue info. - queue (Array) - array of executions you want to run in queue, one execution should contain: - crawlerId (String) - It should be crawler customId or crawler internal id (_id) - settings (Object) - you can overwrite defautl crawler setting for execution with that attribute


API

To run the act, send a HTTP POST request to:

https://api.apify.com/v2/acts/drobnikj~executions-queue/runs?token=<YOUR_API_TOKEN>

The POST payload will be passed as input for the act. For more information, read the docs.


Example input

Content type: application/json; charset=utf-8

{
  "action": "startQueue",
  "maxParallelsExecutions": 1,
  "finishedHookMailTo": ["jakub.drobnik@apify.com", "drobnik.j@gmail.com"],
  "finishWebhookUrl": "https://webhook.site/1610bfcc-b76c-4f85-a6e5-528b6a98fc88",
  "finishWebhookData": {
    "test": "Hello, world!"
  },
  "queue": [
    {
      "crawlerId": "H8Xo8nhNDXwEf3LFJ",
      "settings": {
        "maxCrawledPages": 1
      }
    },
    {
      "crawlerId": "H8Xo8nhNDXwEf3LFJ",
      "settings": {
        "maxCrawledPages": 1
      }
    },
    {
      "crawlerId": "H8Xo8nhNDXwEf3LFJ",
      "settings": {
        "maxCrawledPages": 1
      }
    }
  ]
}

Source code

Based on the apify/actor-node-basic Docker image (see docs).

const Apify = require('apify');
const shortid = require('shortid');
const requestPromised = require('request-promise');
const _ = require('underscore');

const WAIT_BETWEEN_PROCESS_QUEUE = 10000;

const periodicallyProcessQueue = async (queueState, queueId) => {
    const lastQueueState = queueState;
    // check running execution in queue
    let runningExecutions = 0;
    for (const executionId of Object.keys(queueState.running)) {
        const executionDetail = await Apify.client.crawlers.getExecutionDetails({
            executionId,
        });
        if (executionDetail.status === 'RUNNING') {
            console.log(`Execution ${executionId} running`);
            runningExecutions++;
        } else {
            console.log(`Executino ${executionId} finished`);
            delete queueState.running[executionId];
            queueState.finished[executionId] = executionDetail;
        }
    }
    const executionToRun = queueState.maxParallelsExecutions - runningExecutions;
    if (executionToRun && queueState.queue.length) {
        for (let i = 0; i < executionToRun; i++) {
            const crawler = queueState.queue.shift();
            if (!crawler) break;
            const startedExecution = await Apify.client.crawlers.startExecution(crawler);
            queueState.running[startedExecution._id] = startedExecution;
        }
    }
    if (!_.isEqual(lastQueueState, queueState)) {
        await Apify.client.keyValueStores.putRecord({
            storeId,
            key: queueId,
            contentType: 'application/json; charset=utf-8',
            body: JSON.stringify(queueState),
        });
    }
    console.log(`Queue state: In queue - ${queueState.queue.length}, Running - ${Object.keys(queueState.running).length}, Finished - ${Object.keys(queueState.finished).length}`);
    console.log('----------------');
    if (Object.keys(queueState.running).length === 0 && queueState.queue.length === 0) {
        console.log('Queue finished!');
        if (queueState.finishedHookMailTo) {
            const mails = (_.isArray(queueState.finishedHookMailTo)) ? queueState.finishedHookMailTo : [ queueState.finishedHookMailTo ];
            for (const to of mails) {
                await Apify.call('apify/send-mail', {
                    to,
                    subject: `Apify: Finished executions queue ${queueId}`,
                    text: `Executions queue finished.\n\n Finished executions:\n ${Object.keys(queueState.finished)
                    .join('\n')}`,
                });
            }
        }
        if (queueState.finishWebhookUrl) {
            console.log(`Sending finish webhook data to ${queueState.finishWebhookUrl}`);
            let payload = queueState;
            if (queueState.finishWebhookData) Object.assign(payload, queueState.finishWebhookData);
            const request = await requestPromised({
                method: 'POST',
                uri: queueState.finishWebhookUrl,
                body: payload,
                json: true
            });
            console.log(request);
        }
        queueState.finishedAt = new Date();
    }
    return queueState;
};

Apify.main(async () => {
    // Get input of your act
    const input = await Apify.getValue('INPUT');
    console.log('My input:');
    console.dir(input);

    // create or get act global store
    const actId = Apify.getEnv().actId;
    const actGlobalStoreName = `exec-manager-${actId}-${Apify.getEnv().userId}`;
    const globalStore = await Apify.client.keyValueStores.getOrCreateStore({ storeName: actGlobalStoreName });
    const storeId = globalStore.id;
    console.log('Global Store:');
    console.log(globalStore);

    let queueId;
    let queueState;
    // run action deepens on input.action
    if (input.action === 'startQueue') {
        queueId = shortid.generate();
        queueState = Object.assign({
            running: {},
            finished: {},
        }, input);
        await Apify.client.keyValueStores.putRecord({
            storeId,
            key: queueId,
            contentType: 'application/json; charset=utf-8',
            body: JSON.stringify(queueState),
        });
        console.log(`Queue ${queueId} started!`);
    } else {
        queueId = input.queueId;
        const record = await Apify.client.keyValueStores.getRecord({
            storeId,
            key: queueId,
        });
        queueState = record.body;
    }
    const actTimeoutAt = new Date(process.env.APIFY_TIMEOUT_AT);
    while (true) {
        queueState = await periodicallyProcessQueue(queueState, queueId);
        // break if queue is finished
        if (queueState.finishedAt) break;
        // restart act near to process.env.APIFY_TIMEOUT_AT
        if (actTimeoutAt < new Date(Date.now() + 60000)) {
            console.log(`Queue ${queueId} going to restart!`);
            await Apify.client.keyValueStores.putRecord({
                storeId,
                key: queueId,
                contentType: 'application/json; charset=utf-8',
                body: JSON.stringify(queueState),
            });
            await Apify.call(actId, { action: 'processQueue', queueId }, { timeoutSecs: 1 });
            console.log(`Queue ${queueId} at new act!`);
            break;
        }
        await new Promise(resolve => setTimeout(resolve, WAIT_BETWEEN_PROCESS_QUEUE));
    }
});