How to Use Firebase Queues

If you are a Firebase user you likely appreciate the plethora of built in capabilities such as real-time snapshots, Firestore triggers, and authentication. I was recently reading about Deno Queues and realized after years of using Firebase I never really read about Firebase Queues. Just where are Firebase Queues?

A Firebase Queue Use-Case

Ayrshare is built upon Firebase, heavily using Firebase Functions. Ayrshare is a social media API and we allow users to publish or get analytics on the social networks. However, as we have grown, we realized that the way we process scheduled posts isn’t ideal and is pushing the architectural limits.

Whenever a user creates a schedule post using our social API, we create an entry in a Firestore collection – a home grown queue. A scheduled Firebase function runs once a minute to process all the pending posts. This means that multiple scheduled posts are processed together, exceptions in one post might affect other posts, and the function has a max nine minute timeout per Firebase limits.

An obvious enhancement is the scheduled function only processes one post at a time, but that would mean synchronous processing which would significantly delay publishing. While we could overcome these challenges with more grit, at the end of the day we would be developing our own complex queue mechanism. What we really need a way to queue functions and process them independently.

Enqueue Functions with Cloud Tasks

After a lot of searching for “Firebase Queues” you’ll come something called “Enqueue Functions with Cloud Tasks“. As Firebase describes:

Task queue functions take advantage of Google Cloud Tasks to help your app run time-consuming, resource-intensive, or bandwidth-limited tasks asynchronously, outside your main application flow.

At first glance this might not sound like the right solution, but digging a little deeper you’ll see it is a perfect, if a little complex, answer.

To put in layman terms, you create a Firebase queue, i.e. Cloud Task, add data to the queue, and another Firebase process will run for each item, i.e. task, in the queue.

A diagram showing the process of creating a queue.

How to Use The Firebase Queue

Let’s dive into some code on how to create, add tasks, and process those tasks in Firebase.

Create and Add to the Queue

We will start with creating a queue and adding tasks.

const { getFunctions } = require("firebase-admin/functions");
const enqueueScheduledPosts = async () => {
  log("Starting enqueueScheduledPosts");

  const posts = await gatherScheduledPosts();

  const queue = getFunctions().taskQueue("postTask");
  const targetUri = await getFunctionUrl("postTaskFunction");

  const enqueues = [];
  posts.forEach((post) => {
    enqueues.push(
      queue.enqueue(post, {
        scheduleDelaySeconds: 1,
        dispatchDeadlineSeconds: 60 * 8, // 8 minutes
        uri: targetUri
      })
    );
  });

  await Promise.all(enqueues).catch((err) =>
    el("Error with enqueues:", posts, err)
  );

  log("Ending enqueueScheduledPosts");
};

This first function gatherScheduledPosts gathers all the posts that need to be processed.

We then create the queue named postTask with getFunctions().taskQueue("postTask"). All the posts will be added, enqueued, to this queue.

Next, we get the targetUri of the function with getFunctionUrl("postTaskFunction") that will process each task on the queue. The finding the the URI location is discussed below.

We can now enqueue each task with queue.enqueue(data, options). You pass in the parameters data that contains the info you want to process and the queue JSON options. In our example we send:

  • scheduleDelaySeconds – How long to delay between each task processing. This is optional.
  • dispatchDeadlineSeconds – The timeout of the function processing the tasks. This is optional.
  • uri – The target URI of the function to process the tasks. This is required.

The enqueue function returns a promise, so you can process them with the Promise.all function. Once done, all the tasks are on the queue and ready to be processed.

Find the Queue Function URI

The function URI is simply the URL of the function that will be called for each task: https://functionName-code-locId.a.run.app

While you can hard-code the URI of the task processing function, it is better for re-use and maintenance to do a function lookup.

const getFunctionUrl = async (name, location = "us-central1") => {
  const { GoogleAuth } = require("google-auth-library");
  const gAuth = new GoogleAuth({
    scopes: "https://www.googleapis.com/auth/cloud-platform"
  });
  const projectId = await gAuth.getProjectId();

  const url =
    "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await gAuth.getClient();
  const res = await client.request({ url });
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }

  console.log("Function URL for Task:", name, uri);

  return uri;
};

All this code does it look if the URI of the function. For example the lookup might return: https://postTaskFunction-93ks02p21-uc.a.run.app

Process the Queue Tasks

The final step is to process all the tasks on the queue.

const { onTaskDispatched } = require("firebase-functions/v2/tasks");
exports.processScheduledPosts = onTaskDispatched(
  {
    retryConfig: {
      maxAttempts: 1
    },
    rateLimits: {
      maxConcurrentDispatches: 25
    },
    timeoutSeconds: 480,
    memory: "2GiB"
  },
  async (req) => {
    const { data } = req;
    return processPost(data);
);

We use the built-in Firebase function onTaskDispatched to process each individual task. You retrieve the data sent to the function in the request object req and extract the data object. Processing can be done on the data set, e.g. processPost.

The onTaskDispatched task needs several parameters:

  • retryConfig – How many retries attempted if an error occurs.
  • rateLimit – The number of concurrent process allowed.
  • timeoutSeconds – The number of seconds before the function times out.
  • memory – How much memory allocated to the function. Please see the Firebase cloud function pricing.

Firebase Queue Logging in Log Explorer

Another awesome advantage of using the Cloud Task queues is the segmentation of logs per task.

In Google Cloud Log Explorer find an entry that the task logged:

A screen shot of a screen showing the process schedule for a post.

Click on the four blue lines:

A screen showing the entries for a race.

and select “Show entries for this trace”. You’ll now only see the logs for that particular cloud task…a major advantage over trying to decipher mixed logs. Be sure to remove in the field search an entries except for the “trace=…”.

When Should You Use (and Not Use) Firebase Enqueue Tasks?

If you need to process tasks independently, asynchronously, and concurrently, Firebase queues with Cloud Task is a great fit. Even if you have simple queuing needs, you can utilize the Cloud Tasks. It really is easy once you get everything set up.

The caveat emptor are costs. Because you’re calling a Firebase function for every queue task you’ll be paying for every run. You’ll need to weight the cost vs the value of these Cloud Tasks, and you might find a simple Firestore queue, as mentioned in the above use-case, meets your needs.

Overall, Ayrshare switching to Firebase Queues with Cloud Tasks was the perfect new architecture, which increased our resiliency, speed, and maintenance.

About Ayrshare

Ayrshare is a social media API that allows you to publish posts, get analytics, manage comments, and sends direct messages on the social networks directly from your platform. Learn more in our social media API docs.