Using an async iterator on Node.js + S3

There isn't support for async iterators (for await...of) in Node.js v8.9 which is AWS Lambda's runtime. It's shame, as it’s a great feature that allows you to iterate over an iterable that returns as result asynchronously, i.e. retrieving another page from a database, using a compact for loop that feels synchronous but under the covers is actually done asynchronously.
Which means, if you want to use a library that written specifically to use it (e.g. Amazon DynamoDB QueryPaginator), you have to use an even more verbose syntax. However with a bit of re-purposing you can use a generator function that returns a Promise and if you await each promise given in the loop it will behave like an async iterator.

for await..of example

const {QueryPaginator} = require('@aws/dynamodb-query-iterator');
const DynamoDB = require('aws-sdk/clients/dynamodb');
const paginator = new QueryPaginator(new DynamoDB({region: 'us-west-2'}), {TableName: "my_table"});

for await (const page of paginator) {
  // do something with `page`
}

Using an async iterable

To be able to use async iterables  currently in node.js, you’re forced to do something like this:
let records = [];
// the ugly statement, effectively implementing what the javascript engine would 
// compile for await ... of into
for (let page = await paginator.next(); page && !page.done; page = await iterator.next()) {
  if (page.value) {
    records = records.concat(page.value.Items.map(DynamoDB.Converter.unmarshall));
  }
}

A clean way to mimic an async iterator

Creating an implementation close to what we want is relatively simple, but the iteration protocols it has to follow can seem a bit complicated.
The solution utilises a generator which is already an Iterator and an Iterable, the caveat is that the returned object has to be await’d before the next iteration of the for loop is executed. To use it:
const S3 = require('aws-sdk/clients/s3');
const s3 = new S3({region: process.env.AWS_REGION || "ap-southeast-2"});

let params = {Bucket: "my-bucket", Prefix: "startsWith/"};
for (let response of new S3Paginator(s3, params)) {
  let value = await response;
}
And the iterator itself (adapted from DynamoDbPaginator):
class S3Paginator {
  /**
   * @param {S3} client
   * @param {S3.ListObjectsV2Request} input
   */
  constructor(client, input) {
    this.client = client;
    this.nextRequest = {...input};
    this.lastResolved = Promise.resolve();
  }

  /**
   * An imperfect async iterator, which you need to await the value of the response then get it's value.
   * @return {Promise<S3.ListObjectsV2Output>}
   */
  * [Symbol.iterator]() {
    while (true) {
      this.lastResolved = this.getNext().then(({value}) => value);
      if (!this.nextRequest) {
        return this.lastResolved;
      }
      else {
        yield this.lastResolved;
      }
    }
  }

  /** @return {Promise<{value?:S3.ListObjectsV2Output, done: boolean}>} */
  async getNext() {
    await this.lastResolved;
    if (this.nextRequest) {
      let output = await this.client.listObjectsV2({...this.nextRequest}).promise();

      if (this.nextRequest && output.NextContinuationToken) {
        this.nextRequest = {
          ...this.nextRequest,
          ContinuationToken: output.NextContinuationToken
        };
      }
      else {
        this.nextRequest = undefined;
      }

      return {
        value: output,
        done: false
      };
    }

    return {done: true};
  }
}

Turning it into a real async iterable

If you want to use this somewhere that supports it (i.e. a browser), then to turn it into a proper async iterator just requires the 2 functions to be added:
/** @return {Promise<{value?:S3.ListObjectsV2Output, done: boolean}>} */
next() {
  this.lastResolved = this.getNext();
  return this.lastResolved;
}

/** @return {AsyncIterableIterator<S3.ListObjectsV2Output>} */
[Symbol.asyncIterator]() {
  return this;
}

Comments

  1. I started putting some functions like this into a package: @jcorieo/aws-sdk-async-iterables
    https://github.com/jcoreio/aws-utils/tree/master/packages/aws-sdk-async-iterables

    It's much cleaner to implement everything with async generators:

    export async function* listObjectsV2(
    s3: AWS.S3,
    params: AWS.S3.ListObjectsV2Request
    ): AsyncIterable {
    params = { ...params }
    let result
    do {
    result = await s3.listObjectsV2(params).promise()
    if (result.Contents) yield* result.Contents
    if (result.NextContinuationToken) {
    delete params.StartAfter
    params.ContinuationToken = result.NextContinuationToken
    }
    } while (result.IsTruncated)
    }

    ReplyDelete

Post a Comment