Actor

juansgaitan/big-query

  • Builds
  • latest 0.0.70 / 2018-05-22
  • Created 2018-01-24
  • Last modified 2018-05-21
  • grade 1

Description

Append a CSV file to a Google bigQuery table. Create a "Service credentials" at the https://console.cloud.google.com/, copy & paste the JSON file into the variable value for 'CREDENTIALS' and set it as a secret.


API

To run the actor, send a HTTP POST request to:

https://api.apify.com/v2/acts/juansgaitan~big-query/runs?token=<YOUR_API_TOKEN>

The POST payload will be passed as input for the actor. For more information, read the docs.


Example input

Content type: application/json; charset=utf-8

{
    "datasetName": "instagram_hashtags",
    "tableIdAndStoreKey": "users",
    "storeId": "8CzGeDqENYxRRXNwK"
}

Source code

Based on the apify/actor-node-basic Docker image (see docs).

// Source file for Hosted source in 'Source type'
const fs = require('fs');
const Apify = require('apify');
const json2csv = require('json2csv'); // eslint-disable-line
const BigQuery = require('@google-cloud/bigquery'); // eslint-disable-line

const { log, error } = console;

let isStoreIdSet = false;
async function storeOrGetResults(key, items = [], filterKey) {
  if (!isStoreIdSet || !key) {
    throw new Error(`Error while storing or getting results. Missing ${key ?
      'storeId in store' : 'key value'}.`);
  }

  const { keyValueStores } = Apify.client;
  const record = await keyValueStores.getRecord({ key });
  const storeRecord = record && record.body ? record.body : [];
  let previous = typeof storeRecord === 'string' ? JSON.parse(storeRecord) : storeRecord;

  if (items.length === 0) {
    return { previous };
  }

  const current = items.slice();
  if (current.length && previous.length && filterKey) {
    const cache = current.reduce((object, item) => (
      Object.assign(object, { [item[filterKey]]: true })
    ), {});
    previous = previous.filter(item => !cache[item[filterKey]]);
  }

  const next = previous.concat(current);
  if (previous.length !== current.length) {
    await keyValueStores.putRecord({
      key,
      body: JSON.stringify(next)
    });
  } else {
    log('No state modifications required.');
  }

  log('Previous results:', previous.length);
  log('Current results:', current.length);
  log('Next results:', next.length);
  return { previous, current, next };
}

async function createDataset(datasetName, bigquery) {
  const [datasets] = await bigquery.getDatasets();
  const currentDataset = datasets.find(dataset => dataset.id === datasetName);
  if (currentDataset) {
    return currentDataset;
  }
  return bigquery
    .createDataset(datasetName)
    .then((results) => {
      const [dataset] = results;
      log(`Dataset ${dataset.id} created.`);
      return dataset;
    })
    .catch((err) => {
      error('Error while creating dataset:', err);
    });
}

async function getOrCreateTable(dataset, tableId) {
  const [tables] = await dataset.getTables();
  const currentTable = tables.find(({ id }) => id === tableId);
  if (currentTable) {
    log(`Found ${tableId} table.`);
    return currentTable;
  }
  return dataset
    .createTable(tableId)
    .then((results) => {
      const [table] = results;
      log(`Table ${table.id} created.`);
      return table;
    })
    .catch((err) => {
      error('Error while creating table:', err);
    });
}
async function uploadFile(table, filename) {
  let job;
  const config = {
    autodetect: true
  };
  return table
    .load(filename, config)
    .then((data) => {
      [job] = data;
      log(`Job ${job.id} started.`);

      // Wait for the job to finish
      return job;
    })
    .then((metadata) => {
      // Check the job's status for errors
      const errors = metadata.status && metadata.status.errors;
      if (errors && errors.length > 0) {
        throw errors;
      }
    })
    .then(() => {
      log(`Job ${job.id} completed.`);
    })
    .catch((err) => {
      error('Error while uploading file:', err);
    });
}

Apify.main(async () => {
  const input = await Apify.getValue('INPUT');
  const {
    tableIdAndStoreKey,
    datasetName,
    storeId
  } = input;

  if (!(datasetName || tableIdAndStoreKey || storeId)) {
    throw new Error('Received invalid input');
  }

  Apify.client.setOptions({ storeId });
  isStoreIdSet = true;

  const key = tableIdAndStoreKey.toUpperCase();
  const tableId = tableIdAndStoreKey.toLowerCase();

  log('Currently running for:', key);
  const { previous: previousData } = await storeOrGetResults(key);
  if (!previousData.length) {
    throw new Error(`The kv-store under the ${key} is empty.`);
  }
  log(`Items in ${key}:`, previousData.length);

  const credentials = process.env.CREDENTIALS;
  const { project_id: projectId } = JSON.parse(credentials);
  log('Project ID:', projectId);

  const keyFilename = '/credentials.json';
  try {
    await fs.writeFileSync(keyFilename, credentials);
  } catch (err) {
    throw new Error('Error while saving credentials.');
  }

  const bigquery = new BigQuery({ projectId, keyFilename });

  log('Getting or creating dataset...');
  const dataset = await createDataset(datasetName, bigquery);
  log('Dataset id:', dataset.id);

  log('Getting or creating table...');
  const table = await getOrCreateTable(dataset, tableId);

  let rows;
  const options = {
    format: 'json',
    gzip: true
  };
  try {
    [rows] = await table.getRows(options);
  } catch (err) {
    throw new Error('Error while extracting rows from table.');
  }
  log('BigQuery table current results:', rows.length);

  let memo;
  let data;
  if (tableId === 'users') {
    memo = rows.reduce((cache, { username }) => (
      Object.assign(cache, { [username]: true })
    ), {});
    data = previousData.filter(({ username }) => {
      if (memo[username] || !username) {
        return false;
      }
      memo[username] = true;
      return true;
    });
  } else {
    memo = rows.reduce((cache, { referrer }) => (
      Object.assign(cache, { [referrer]: true })
    ), {});
    data = previousData.filter(({ referrer }) => {
      if (memo[referrer] || !referrer) {
        return false;
      }
      memo[referrer] = true;
      return true;
    });
  }

  if (data.length === 0) {
    log('No new results to insert.');
    log('Done.');
    return null;
  }

  let csv;
  const [firstItem] = data;
  const pathToFile = `${tableId}.csv`;
  const fields = Object.keys(firstItem);
  try {
    csv = json2csv({ data, fields });
  } catch (err) {
    error('Error while converting JSON to CSV:', err);
  }

  try {
    await fs.writeFileSync(pathToFile, csv);
  } catch (err) {
    throw new Error('Error while saving CSV file:', err);
  }
  log(`File '${pathToFile}' saved.`);

  log(`Adding ${data.length} new results to the '${tableId}' table...`);
  await uploadFile(table, pathToFile);

  log('Done.');
  return null;
});