AWS CDK: Amazon EventBridge Pipes and SQS

December 10th, 2023 367 Words

When managing continuous events, Amazon EventBridge is a powerful and fully managed service. With Amazon EventBridge Pipes it’s easy to organize, structure, and transform incoming data messages. Using the AWS Cloud Development Kit, this guide will configure an Amazon SQS Queue and use Amazon EventBridge Pipes for data processing and transformation.

Basic Components

To get started, first create a new CDK application and a CloudFormation Stack for the needed resources: An Amazon EventBridge Event Bus and am Amazon SQS Queue, of course.

import { App, Stack } from 'aws-cdk-lib';

const app = new App()
const stack = new Stack(app, 'example')

const bus = new EventBus(stack, 'bus')
const queue = new Queue(stack, 'queue')

Next, we need to configure an Amazon EventBridge Pipe to read messages from queue and submit events to bus, that’s it …

EventBridge Pipe IAM Role

The needed CloudFormation resource is available in the AWS Cloud Development Kit: CfnPipe . But first, create the needed IAM Role:

const pipeRolePolicyQueue = new PolicyDocument({
  statements: [
    new PolicyStatement({
      effect: Effect.ALLOW,
      resources: [ queue.queueArn ],
      actions: [ 'sqs:ReceiveMessage', 'sqs:DeleteMessage', 'sqs:GetQueueAttributes' ],
    }),
  ],
})

const pipeRolePolicyBus = new PolicyDocument({
  statements: [
    new PolicyStatement({
      effect: Effect.ALLOW,
      resources: [ bus.eventBusArn ],
      actions: [ 'events:PutEvents' ],
    }),
  ],
})

const pipeRole = new Role(stack, 'role', {
  assumedBy: new ServicePrincipal('pipes.amazonaws.com'),
  inlinePolicies: {
    queue: pipeRolePolicyQueue,
    bus: pipeRolePolicyBus,
  },
})

Using this IAM Role, you can next create the Amazon EventBridge Pipe.

Create EventBridge Pipe

Next, use the provided CfnPipe resource:

new CfnPipe(stack, 'pipe', {
  roleArn: pipeRole.roleArn,
  source: queue.queueArn,
  sourceParameters: {
    sqsQueueParameters: {
      batchSize: 1,
      maximumBatchingWindowInSeconds: 120,
    },
  },
  target: bus.eventBusArn,
  targetParameters: {
    eventBridgeEventBusParameters: {
      detailType: 'detail-type',
      source: `your.incoming.daa`,
    },
    inputTemplate: `{
      "message": "<$.body.message>"
    }`,
  },
})

After deployment, you can use the AWS CLI to send messages to the SQS queue, which will be processed by the EventBridge Pipe, and submitted to your EventBridge Bus afterwards.

$ > aws sqs send-message \
    --queue-url https://sqs.eu-central-1.amazonaws.com/12345EXAMPLE/YourQueue \
    --message-body "{\"message\": \"lorem ipsum\"}"

That’s it! 🎉 Using the AWS CLI may work fine for a first example; For an easy integration with other services, you might additionally want to configrue Amazon API Gateway integration for SQS. This will enable a simple HTTP interface for incoming events …