How to create a Control Table (for ingestion of files in S3 bucket)

Igorps
5 min readAug 23, 2023

--

This post has the main objective to present one of the many possibilities to track the files in the Transient Zone (ingestion zone) in AWS.

Services Used:

  1. AWS Cli (Powershell)
  2. EventTracker Scheduler (Set a time to start the job)
  3. Lambda (contains the script that is going to track all the files that were added to the Ingestion Zone)
  4. S3 (Files ingested will be stored in a S3 bucket) (Files will be stored in a bucket called Transient Zone)

1) AWS Cli:

AWS Cli - send files from local folder on the PC (on-premise) to S3 Bucket (cloud enviroment)

Open Powershell and type:

aws configure

Set your ID and Access Key:

Examples:

aws s3 ls

List files inside the Transient Zone Bucket:

aws s3 ls bkt-musicstream-bi/Files/TransientZone/

Send files to the Transient Zone Bucket

  • Command: - - recursive (Note: Send all files inside the folder)
aws s3 cp c:\Users\YourUser\CaminhoPasta\ s3://bkt-musicstream-bi/Files/TransientZone/ --recursive

For further information access this article:

How To Upload Files to S3 with the AWS CLI

2) EventBridge Scheduler:

For this example, we will use the EventTracker Scheduler to start a Lambda function that will catalogue all the files that were ingested on the Transient Zone Bucket.

The main point is to schedule a job to run from monday to friday to catalogue files in S3.

The scheduler will be set as: “At 03:00 on every day-of-month from 1 through 31 and on every day-of-week from Monday through Friday.”

Cron Code Equivalent: 0 3 1–31 * 1–5

For further information about Cron:

Oracle Documentation

Cron Generator

3) Lambda Function:

This lambda will execute a Python code to list all files in the Transient Bucket and create a table with information about:

Filename (Name of the file)

Type of File (csv, for example) Size (MB)

BucketSource (Source where the information came from, in this case, Transient Zone)

Layers that will need to be included to run the lambda properly.

Pandas, Numpy and Pytz.

For further information about includding layers to a Lambda:

Medium post: Easiest Lambda Layer for Python Functions

Github with layers repository

Lambda Code

#Import libraries
import json
import boto3
import pandas as pd #layer
from io import StringIO
import os
import sys
import pytz #layer
import datetime
from datetime import datetime
from datetime import timedelta
from datetime import date
from dateutil.relativedelta import relativedelta

def lambda_handler(event, context):
agora = datetime.now(pytz.timezone('America/Sao_Paulo'))
dthproc = agora.strftime("%Y%m%d%H%M%S")

# Create a Boto3 client for S3
s3_client = boto3.client('s3')

# Specify the S3 bucket and object key
bucket_name = 'bkt-musicstream-bi'
subfolder = 'Files/TransientZone/'

# List objects in the subfolder
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=subfolder)

# Extract file names and timestamps from the response
files_info = [(os.path.basename(obj['Key']), obj['LastModified'], os.path.splitext(os.path.basename(obj['Key']))[1], obj.get('Size', 0), f"s3://{bucket_name}") for obj in response.get('Contents', []) if not obj['Key'].endswith('/')]

# Format the timestamps and add them to the list of files_info
formatted_files_info = []
for file_name, timestamp, file_type, file_size, bucket_source in files_info:
formatted_timestamp = timestamp.astimezone(pytz.timezone('America/Sao_Paulo')).strftime("%Y%m%d%H%M%S")
formatted_file_size = f"{file_size / (1024 * 1024):.2f} MB" # Convert to MB and format to two decimal places
formatted_files_info.append((file_name, formatted_timestamp, file_type, formatted_file_size, bucket_source))
#print(f"File: {file_name}, Timestamp: {formatted_timestamp}, FileType: {file_type}, FileSize: {formatted_file_size}, BucketSource: {bucket_source}")



# Create a DataFrame with the file names, timestamps, and file types
df_new = pd.DataFrame(formatted_files_info, columns=["FileName", "IngestionTimestamp", "FileType", "FileSize (MB)", "BucketSource"])

# Convert 'IngestionTimestamp' column in df_new to int64 data type to match df_existing
df_new['IngestionTimestamp'] = df_new['IngestionTimestamp'].astype('int64')



# Create a csv file and storage it in a variable
csv_data_new = df_new.to_csv(index=False)

# Specify the destination S3 bucket and file path
destination_bucket_name = 'bkt-musicstream-bi'
destination_file_path = 'Files/IngestionControl/Ingestion_Control_Table.csv'

# Check if the file already exists in the destination bucket
try:
existing_data = s3_client.get_object(Bucket=destination_bucket_name, Key=destination_file_path)['Body'].read()
existing_data_str = existing_data.decode('utf-8')
df_existing = pd.read_csv(StringIO(existing_data_str))

# Combine the DataFrames and remove duplicates based on the content
df_combined = pd.concat([df_existing, df_new]).drop_duplicates()

csv_data_combined = df_combined.to_csv(index=False)
except s3_client.exceptions.NoSuchKey:
# If the file doesn't exist, use the new data directly
csv_data_combined = csv_data_new

# Upload the combined CSV data to the destination bucket
s3_client.put_object(Bucket=destination_bucket_name, Key=destination_file_path, Body=csv_data_combined)

return {
'statusCode': 200,
'body': 'File with info.csv has been updated in the destination bucket.'
}

Generated .csv will be stored inside the IngestionControl folder.

Folders
Ingestion_Control_Table.csv

.

4) Generated Control Table as .csv file

Column info:

Filename

IngestionTimestamp (yyyy-mm-dd-hh-MM-ss)

FileType: (.csv, .xslx, .txt, .jpeg, for example)

FileSize (MB)

BucketSource

I hope that this article was helpful.

Further information about appending data info of the buckets and table will be covered soon.

--

--