Pandas on AWS with AWS Data Wrangler

Pandas on AWS with AWS Data Wrangler

The GitHub page of the project describe the library as Pandas on AWS. Pandas an is open source data analysis and manipulation tool, built on top of the Python programming language. Pandas is designed to be fast, powerful, flexible and easy to use.

AWS

What is AWS Data Wrangler library ? The GitHub page of the project describes the library as Pandas on AWS.

In case, you stayed in your cave for a long time, Pandas is an open source data analysis and manipulation tool, built on top of the Python programming language. Pandas is designed to be fast, powerful, flexible and easy to use.

Positioning itself a “ Pandas on AWS ” immediately raises the bar.

It is a project available from the GitHub organization AWSLab. You can find the organization page a bunch of projects open sourced by AWS, some of them more or less used or mature. The s2n project, an implementation of the TLS/SSL protocols, is a good example of mature projects available.

AWS Data Wrangler module represents to date, more than 771 commits, 20 contributors, and 52 releases. Versions are currently released at a sustained pace, and the Python module is currently available in version 1.4.0.

Installation

There are two ways to install the module. Either using pip or using Conda.

Pip install

To install the module with pip, you can use the following command:

pip install awswrangler

Conda install

If you are a Conda user, instead, you can install the module with the following command:

conda install -c conda-forge awswrangler

Basic usage

Following the GitHub readme introduction, here is the way to create a basic DataFrame with Pandas:

import pandas as pd

df = pd.DataFrame({"id": [1, 2], "value": ["foo", "bar"]})

And, then import the AWS Data Wrangler module:

import awswrangler as wr

Write data to Amazon S3

Now, lets create, into an S3 bucket, a data file representing the data from the DataFrame serialized into a file:

# Storing data on Data Lake
wr.s3.to_parquet(
    df=df,
    path="s3://bucket/dataset/",
    dataset=True,
    database="my_db",
    table="my_table"
)

Easy ! An s3 variable at the root of the AWS Data Wrangler module lets the user access functions allowing to interact with s3, in this case to flush the DataFrame to S3.

Read data from Amazon S3

The reverse function is also available allowing to read the data from S3:

# Retrieving the data directly from Amazon S3
df = wr.s3.read_parquet("s3://bucket/dataset/", dataset=True)

You may wonder what is possible to do with the AWS Data Wrangler package apart interacting with S3. Let's take a free tour over some of the libraries features to discover some of its capabilities.

Definition

Here is an accurate definition of the library as displayed in the documentation:

An open-source Python package that extends the power of Pandas library to AWS connecting DataFrames and AWS data related services (Amazon Redshift, AWS Glue, Amazon Athena, Amazon EMR, etc).

Built on top of other open-source projects like Pandas, Apache Arrow, Boto3, s3fs, SQLAlchemy, Psycopg2 and PyMySQL, it offers abstracted functions to execute usual ETL tasks like load/unload data from Data Lakes, Data Warehouses and Databases.

Supported services

The aim of the library is to simplify interaction with the data across AWS supported services. Basically, AWS Data Wrangler library is supporting 5 services from AWS:

  • Amazon S3
  • AWS Glue Catalog
  • Amazon Athena
  • Databases (Redshift, PostgreSQL & Mysql)
  • EMR
  • CloudWatch Logs

As the library extends the power of Pandas library to AWS connecting DataFrames and AWS data related services, most of operations available, directly dealing with loading or flushing the data, will rely on Pandas DataFrames.

Simplifying interactions

However, the package is not focused only on loading / unloading the data. The package is also meant to simplify things, more specifically, simplifying interactions with services.

The library provides, for example, functions to :

  • Load / unload data for Redshift
  • Generate a Redshift copy manifest instead of having to generate it by yourself

but also to :

  • simplify create of EMR clusters or definition and submission of build steps.

Interacting with AWS Athena

Interacting with AWS Athena can be cumbersome. To reduce the burden, you have access to functions making things easier, for example, to start, stop, or wait for query completion.

Goodness does not stop on AWS Athena simplified interactions. You will also find  improvements in interacting with AWS Glue Data Catalog, making code writing straightforward.

AWS Data Wrangler as default way to interact ?

Given all this improvements made available over the standard APIs, it should be a no brainer to use it as your default way to interact with the supported services in a data processing context with Python.

Lets now go deeper in more detailed examples and notions around the AWS Data Wrangler package. To do that, let's start with sessions.

Sessions

AWS Data Wrangler interacts with AWS services using a default Boto3 Session. That's why, you won't have to provide most of the time any session informations. However, if you need to customize the session the module is working with, it is possible to reconfigure default boto3 session:

 boto3.setup_default_session(region_name="eu-west-1")

or  even instantiate a new boto3 session, and passing it as a named parameter to function calls:

session = boto3.Session(region_name="us-east-2")
wr.s3.does_object_exist("s3://foo/bar", boto3_session=session)

Amazon S3

As mentioned previously, an s3 variable is available at the root of the AWS Data Wrangler module. The s3 variable will essentially allow you to interact with Amazon S3 service to work on CSV, JSON, Parquet and fixed-width formatted files along with having access to some handy functions purely related to file manipulations.

Lets define first 2 DataFrames:

import awswrangler as wr
import pandas as pd
import boto3

df1 = pd.DataFrame({
    "id": [1, 2],
    "name": ["foo", "boo"]
})

df2 = pd.DataFrame({
    "id": [3],
    "name": ["bar"]
})

Having those 2 DataFrames created, it will be possible to write them simply to S3 this way:

bucket = "my-bucket"

path1 = f"s3://{bucket}/csv/file1.csv"
path2 = f"s3://{bucket}/csv/file2.csv"

wr.s3.to_csv(df1, path1, index=False)
wr.s3.to_csv(df2, path2, index=False)

As a result, it is also possible to read the previously written files in similar fashion:

df1Bis = wr.s3.read_csv(path1)

df1bis and df1 should present the exact same data.

Finally, it is also possible to re-read written data by reading multiple CSV files at once, listing explicitly which files have to be read:

wr.s3.read_csv([path1, path2])

Things can be made even easier by providing only the prefix to read data from:

wr.s3.read_csv(f"s3://{bucket}/csv/")

As seen, in example before, it is very easy to interact with S3, without having to deal with code complexities or boilerplates.

AWS Glue Data Catalog

Having tried a demo of the library interacting with Amazon S3, the next step is to let the user interact directly with the AWS Glue Data Catalog ?

To interact with, the user just have to use the catalog variable on the module.

wr.catalog.databases()

Previous command should return the database list this way:

Database Description
0 awswrangler_test AWS Data Wrangler Test Arena - Glue Database
1 default Default Hive database
2 sampledb Sample database

It may not be that simple with direct usage of boto3 API. But it will be that simple also to list available tables in a specific Database:

wr.catalog.tables(database="awswrangler_test")

The command should return the following result:

Database Table Description Columns Partitions
0 awswrangler_test lambda col1, col2
1 awswrangler_test noaa id, dt, element, value, m_flag, q_flag, s_flag...

Now, to get table details, meaning column informations, there is just the need to call the table() function over the catalog variable.

wr.catalog.table(database="awswrangler_test", table="boston")

The command should return the following field list:

Column Name Type Partition Comment
0 crim double False per capita crime rate by town
1 zn double False proportion of residential land zoned for lots ...
2 indus double False proportion of non-retail business acres per town
3 chas double False Charles River dummy variable (= 1 if tract bou...
4 nox double False nitric oxides concentration (parts per 10 mill...
5 rm double False average number of rooms per dwelling
6 age double False proportion of owner-occupied units built prior...
7 dis double False weighted distances to five Boston employment c...
8 rad double False index of accessibility to radial highways
9 tax double False full-value property-tax rate per $10,000
10 ptratio double False pupil-teacher ratio by town
11 b double False 1000(Bk - 0.63)^2 where Bk is the proportion o...
12 lstat double False lower status of the population
13 target double False

You may wonder however how to create a table, let's say in Parquet format. To proceed, you have to call the function to_parquet() on s3 variable providing the required parameters:

Parameter Type Description
df pandas.DataFrame Pandas DataFrame https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html
path str S3 path (for file e.g. s3://bucket/prefix/filename.parquet) (for dataset e.g. s3://bucket/prefix)
dataset bool If True store a parquet dataset instead of a single file. If True, enable all follow arguments: partition_cols, mode, database, table, description, parameters, columns_comments.
database str, optional Glue/Athena catalog: Database name.
table str, optional Glue/Athena catalog: Table name
mode str, optional append (Default), overwrite, overwrite_partitions. Only takes effect if dataset=True
description str, optional
parameters Dict[str, str], optional
columns_comments Dict[str, str], optional

All parameters can be found at the following URL: https://aws-data-wrangler.readthedocs.io/en/latest/stubs/awswrangler.s3.to_parquet.html#awswrangler.s3.to_parquet.

Writing a pandas DataFrame to S3 in Parquet format, and referencing it in Glue Data Catalog, can be done this way with the following code:


desc = """This is a copy of UCI ML housing dataset. https://archive.ics.uci.edu/ml/machine-learning-databases/housing/
This dataset was taken from the StatLib library which is maintained at Carnegie Mellon University.
The Boston house-price data of Harrison, D. and Rubinfeld, D.L. ‘Hedonic prices and the demand for clean air’, J. Environ. Economics & Management, vol.5, 81-102, 1978. Used in Belsley, Kuh & Welsch, ‘Regression diagnostics …’, Wiley, 1980. N.B. Various transformations are used in the table on pages 244-261 of the latter.
The Boston house-price data has been used in many machine learning papers that address regression problems.
"""

param = {
    "source": "scikit-learn",
    "class": "cities"
}

comments = {
    "crim": "per capita crime rate by town",
    "zn": "proportion of residential land zoned for lots over 25,000 sq.ft.",
    "indus": "proportion of non-retail business acres per town",
    "chas": "Charles River dummy variable (= 1 if tract bounds river; 0 otherwise)",
    "nox": "nitric oxides concentration (parts per 10 million)",
    "rm": "average number of rooms per dwelling",
    "age": "proportion of owner-occupied units built prior to 1940",
    "dis": "weighted distances to five Boston employment centres",
    "rad": "index of accessibility to radial highways",
    "tax": "full-value property-tax rate per $10,000",
    "ptratio": "pupil-teacher ratio by town",
    "b": "1000(Bk - 0.63)^2 where Bk is the proportion of blacks by town",
    "lstat": "lower status of the population",
}

res = wr.s3.to_parquet(
    df=df,
    path=f"s3://{bucket}/boston",
    dataset=True,
    database="awswrangler_test",
    table="boston",
    mode="overwrite",
    description=desc,
    parameters=param,
    columns_comments=comments
)

This code example is sourced from the AWS Data Wrangler tutorials, and more specifically the following one: https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/005%20-%20Glue%20Catalog.ipynb.

The execution of previous code sample in AWS Glue Data Catalog results in the following table informations:

AWS Athena

Now that we have learned to interact with Amazon S3 and AWS Glue Data Catalog, and that we know how to flush DataFrames in S3 and reference it as a dataset  in the Data Catalog, we can focus on how to interact with data stored with the service AWS Athena.

AWS Data Wrangler allows to run queries on Athena and fetches results in two ways:

  • Using CTAS (ctas_approach=True), which is the default method.
  • Using regular queries (ctas_approach=True), and parsing CSV results on S3.

ctas_approach=True

As mentioned in tutorials, this first approach allows to wrap the query with a CTAS, and read the table data as parquet directly from S3. It is faster as it relies on Parquet and not CSV, but it also enables support for nested types. It is mostly a trick compared to the original approach provided officially by the API, but it is effective and fully legal.

The counterpart to use this approach is that you need additional permissions on Glue (Requires create/delete table permissions). The background mechanism is based on the creation of a temporary table that will be immediately deleted after consumption.

Query example:

wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test")

****ctas_approach=False

Using the regular approach parsing the resulting CSV on S3 provided as query execution result does not requires additional permissions. The read of results will not be as fast as the approach relying on CTAS, but it will anyway be faster than reading results with standard AWS APIs.

Query example:

wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", ctas_approach=False)

The only difference with previous example is the change of ctas_approach parameter value from True to False.

Use of categories

Defining DataFrame columns as category allows to optimize the speed of execution, but also helps to save memory. There is only the need to define an additional parameter categories to the function to leverage the improvement.

wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", categories=["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"])

The returned columns are of type pandas.Categorical .

Batching read of results

This option is good for memory constrained environments. Activating this option can be done by passing parameter chunksize. The value provided corresponds to the size of the chunk of data to read. Reading datasets this way allows to limit and constrain memory used, but also implies to read the full results by iterating over chunks.

Query example:

dfs = wr.athena.read_sql_query(
    "SELECT * FROM noaa",
    database="awswrangler_test",
    ctas_approach=False,
    chunksize=10_000_000
)

for df in dfs:  # Batching
    print(len(df.index))

Knowing that big datasets can be challenging to load and read, it is a good workaround to avoid memory issues.

Packaging & Dependencies

Availability as an AWS Lambda layer

Going behind the toy demo, you may wonder how to integrate it with your code. Is it integrable with ease using for example AWS Lambda functions ? Will you have to build a complex pipeline to integrate it the right way into your AWS Lambda package ?

The answer is definitively: No ! A Lambda Layer's zip-file is available along Python wheels & eggs. The Lambda Layers are available at the moment in 3 flavors: Python 3.6, 3.7 & 3.8.

AWS Glue integration

As the AWS Data Wrangler package counts on compiled dependencies (C/C++), there is no support for Glue PySpark by now. Only  integration with Glue Python Shell is possible at the moment.

Going one step deeper

If you want to learn more about the library, fee free to read the documentation as it is a good source of inspiration. You can also visit the GitHub repository of the project and crawl the tutorial directory.