Skip to content

s3Firehose

View Source

Amazon Kinesis Data Firehose is a fully managed service for streaming data to S3 with automatic scaling, data transformation, and format conversion. It enables real-time analytics by delivering streaming data to data lakes, with built-in compression, partitioning, and optional Parquet conversion for cost-effective storage.

import * as saws from "@stackattack/aws";
const ctx = saws.context();
const firehose = saws.s3Firehose(ctx, {
bucket: "my-analytics-bucket",
prefix: "events/"
});
export const deliveryStreamArn = firehose.firehose.arn;

Send data to Firehose using AWS SDK or other AWS services:

// Using AWS SDK
import { FirehoseClient, PutRecordCommand } from "@aws-sdk/client-firehose";
const client = new FirehoseClient({ region: "us-east-1" });
await client.send(new PutRecordCommand({
DeliveryStreamName: "my-s3-firehose",
Record: {
Data: JSON.stringify({
timestamp: new Date().toISOString(),
userId: "user123",
event: "page_view",
properties: { page: "/home" }
})
}
}));

Monitor delivery and analyze data:

Terminal window
# Check delivery stream status
aws firehose describe-delivery-stream --delivery-stream-name my-s3-firehose
# View delivered data in S3
aws s3 ls s3://my-analytics-bucket/events/ --recursive
# Query with Amazon Athena (if using Parquet)
aws athena start-query-execution --query-string \
"SELECT * FROM events WHERE year='2024' LIMIT 10"

S3 firehoses work together with other Stackattack components:

  • bucket - Buckets are used to store the data that’s written to the firehose.

Firehose pricing is based on data volume with no upfront costs:

  • Data ingestion: $0.029 per GB ingested
  • Format conversion: $0.018 per GB converted (JSON to Parquet)
  • Data transfer: Standard AWS transfer rates
  • S3 storage: Standard S3 pricing for stored data
  • Compression: Can reduce storage costs by 70-90%

Cost optimization strategies:

  • Use GZIP compression (default) to reduce S3 storage costs
  • Implement appropriate buffering (size/time) to optimize delivery efficiency
  • Use Parquet format for analytics workloads to reduce query costs in Athena
  • Partition data by timestamp to enable efficient querying and lifecycle policies
  • Monitor error rates to avoid paying for failed delivery attempts

Creates a Kinesis Firehose delivery stream that delivers data to S3 with optional partitioning and Parquet conversion.

function s3Firehose(ctx: Context, args: S3FirehoseArgs): { errorUrl: Output<string>; firehose: FirehoseDeliveryStream; url: Output<string> }
  • ctx (Context) - The context for resource naming and tagging
  • args (S3FirehoseArgs) - Configuration arguments for the Firehose delivery stream
  • ({ errorUrl: Output<string>; firehose: FirehoseDeliveryStream; url: Output<string> }) - Creates a Kinesis Firehose delivery stream that delivers data to S3 with optional partitioning and Parquet conversion.

Creates an IAM policy document that grants Firehose access to read from a Kinesis stream.

function s3FirehoseKinesisPolicy(kinesisStreamArn: Input<string>): Output<GetPolicyDocumentResult>
  • kinesisStreamArn (Input<string>) - ARN of the Kinesis stream to grant access to
  • (Output<GetPolicyDocumentResult>) - Creates an IAM policy document that grants Firehose access to read from a Kinesis stream.

Creates an IAM policy document that grants Firehose access to S3, CloudWatch Logs, and optionally Glue.

function s3FirehosePolicy(args: S3FirehosePolicyArgs): Output<GetPolicyDocumentResult>
  • (Output<GetPolicyDocumentResult>) - Creates an IAM policy document that grants Firehose access to S3, CloudWatch Logs, and optionally Glue.

Configuration arguments for creating a Kinesis Firehose delivery stream to S3.

  • bucket (Input<BucketInput>) - The S3 bucket to deliver data to
  • bufferInterval? (Input<number>) - Buffering interval in seconds (defaults to 900)
  • bufferSize? (Input<number>) - Buffering size in MB (defaults to 64)
  • dynamicPartitioningFields? (Record<string, Input<string>>) - JQ queries for dynamic partitioning by field values
  • errorPrefix? (Input<string>) - S3 prefix for error outputs (defaults to prefix + “error/”)
  • glueParquetTableArn? (Input<string>) - Optional Glue table ARN for Parquet conversion
  • logRetentionDays? (Input<number>) - CloudWatch log retention in days (defaults to 365)
  • noPrefix? (boolean) - Whether to skip adding a prefix to the resource name
  • partitionErrorsByType? (boolean) - Whether to partition errors by type
  • prefix? (Input<string>) - S3 prefix for successful deliveries
  • timestampPartitioning? ("year" | "month" | "day" | "hour") - Timestamp-based partitioning granularity

Arguments for creating IAM policy for S3 Firehose access.

  • bucket (Input<BucketInput>) - The S3 bucket to grant access to
  • glueParquetTableArn? (Input<string>) - Optional ARN of a Glue table for Parquet conversion
  • logStreamArn (Input<string>) - ARN of the CloudWatch log stream for logging