Using an async iterator on Node.js + S3
There isn't support for async iterators (
Which means, if you want to use a library that written specifically to use it (e.g. Amazon DynamoDB QueryPaginator), you have to use an even more verbose syntax. However with a bit of re-purposing you can use a generator function that returns a
The solution utilises a generator which is already an Iterator and an Iterable, the caveat is that the returned object has to be await’d before the next iteration of the for loop is executed. To use it:
for await...of
) in Node.js v8.9 which is AWS Lambda's runtime.
It's shame, as it’s a great feature that allows you to iterate over an iterable
that returns as result asynchronously,
i.e. retrieving another page from a database, using a compact for loop that feels synchronous but under the covers is actually done asynchronously.
Which means, if you want to use a library that written specifically to use it (e.g. Amazon DynamoDB QueryPaginator), you have to use an even more verbose syntax. However with a bit of re-purposing you can use a generator function that returns a
Promise
and if you await each promise given in the loop it will behave like an async iterator.for await..of example
const {QueryPaginator} = require('@aws/dynamodb-query-iterator'); const DynamoDB = require('aws-sdk/clients/dynamodb'); const paginator = new QueryPaginator(new DynamoDB({region: 'us-west-2'}), {TableName: "my_table"}); for await (const page of paginator) { // do something with `page` }
Using an async iterable
To be able to use async iterables currently in node.js, you’re forced to do something like this:let records = []; // the ugly statement, effectively implementing what the javascript engine would // compile for await ... of into for (let page = await paginator.next(); page && !page.done; page = await iterator.next()) { if (page.value) { records = records.concat(page.value.Items.map(DynamoDB.Converter.unmarshall)); } }
A clean way to mimic an async iterator
Creating an implementation close to what we want is relatively simple, but the iteration protocols it has to follow can seem a bit complicated.The solution utilises a generator which is already an Iterator and an Iterable, the caveat is that the returned object has to be await’d before the next iteration of the for loop is executed. To use it:
const S3 = require('aws-sdk/clients/s3'); const s3 = new S3({region: process.env.AWS_REGION || "ap-southeast-2"}); let params = {Bucket: "my-bucket", Prefix: "startsWith/"}; for (let response of new S3Paginator(s3, params)) { let value = await response; }And the iterator itself (adapted from DynamoDbPaginator):
class S3Paginator { /** * @param {S3} client * @param {S3.ListObjectsV2Request} input */ constructor(client, input) { this.client = client; this.nextRequest = {...input}; this.lastResolved = Promise.resolve(); } /** * An imperfect async iterator, which you need to await the value of the response then get it's value. * @return {Promise<S3.ListObjectsV2Output>} */ * [Symbol.iterator]() { while (true) { this.lastResolved = this.getNext().then(({value}) => value); if (!this.nextRequest) { return this.lastResolved; } else { yield this.lastResolved; } } } /** @return {Promise<{value?:S3.ListObjectsV2Output, done: boolean}>} */ async getNext() { await this.lastResolved; if (this.nextRequest) { let output = await this.client.listObjectsV2({...this.nextRequest}).promise(); if (this.nextRequest && output.NextContinuationToken) { this.nextRequest = { ...this.nextRequest, ContinuationToken: output.NextContinuationToken }; } else { this.nextRequest = undefined; } return { value: output, done: false }; } return {done: true}; } }
Turning it into a real async iterable
If you want to use this somewhere that supports it (i.e. a browser), then to turn it into a proper async iterator just requires the 2 functions to be added:/** @return {Promise<{value?:S3.ListObjectsV2Output, done: boolean}>} */ next() { this.lastResolved = this.getNext(); return this.lastResolved; } /** @return {AsyncIterableIterator<S3.ListObjectsV2Output>} */ [Symbol.asyncIterator]() { return this; }
I started putting some functions like this into a package: @jcorieo/aws-sdk-async-iterables
ReplyDeletehttps://github.com/jcoreio/aws-utils/tree/master/packages/aws-sdk-async-iterables
It's much cleaner to implement everything with async generators:
export async function* listObjectsV2(
s3: AWS.S3,
params: AWS.S3.ListObjectsV2Request
): AsyncIterable {
params = { ...params }
let result
do {
result = await s3.listObjectsV2(params).promise()
if (result.Contents) yield* result.Contents
if (result.NextContinuationToken) {
delete params.StartAfter
params.ContinuationToken = result.NextContinuationToken
}
} while (result.IsTruncated)
}