AWS for Industries
Accelerating Fraud Detection in Financial Services with NVIDIA RAPIDS™ Accelerator for Apache Spark on AWS
Transaction fraud poses a growing challenge that financial institutions struggle to combat effectively. Recent estimates forecast that payment fraud will exceed $326 billion for the period between 2023 and 2028, with remote purchases of physical goods accounting for over 47 percent of fraud losses¹. Legacy CPU-based systems rely on sequential processing, leading to increased operational costs and inefficiencies. By leveraging NVIDIA AI, accelerated computing and parallel processing in fraud prevention efforts, AWS has developed a powerful approach to solve this problem. We integrate the NVIDIA Fraud Detection AI Workflow using NVIDIA RAPIDS™ with graph neural network (GNN) embeddings for acceleration and accuracy. In this blog, we will focus on data processing as a part of this workflow.
The RAPIDS Accelerator for Apache Spark is a plug-in that leverages RAPIDS libraries and GPUs to accelerate data processing and machine learning pipelines on Apache Spark. It transforms existing pipelines without any code change. This tool enables processing of enormous volumes of sensitive financial data at unprecedented speeds, all while maintaining commitment to regulatory compliance and data security. The RAPIDS Accelerator enhances fraud detection, risk assessment, and portfolio optimization.
This blog shows a GPU-accelerated fraud detection workflow using the RAPIDS Accelerator for Apache Spark and Apache Spark on AWS. The first part of the workflow focuses on data processing, where we showcase improvements in processing speed and cost efficiency compared to CPU-based alternatives.
Amazon EMR Clusters with NVIDIA GPUs: Boosting Speed and Efficiency
To leverage the RAPIDS Accelerator for Apache Spark for fraud detection, you must configure the Amazon EMR cluster with GPU-enabled instances and specific settings to optimize performance. Below are the details for setting up both GPU and CPU clusters:
GPU Cluster Configuration
- Instances:
- Primary Node: M5.xlarge
- Core Nodes: 12 nodes of G6.4xlarge (NVIDIA L4 GPU-enabled)
- Follow https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-rapids.html with the latest instructions on how to enable spark RAPIDS on EMR.
Below is an example.
- Bootstrap Script:
The bootstrap script sets up the environment for GPU acceleration, including mounting cgroups and installing necessary libraries:
#!/bin/bash
set -ex
sudo mkdir -p /spark-rapids-cgroup/devices
sudo mount -t cgroup -o devices cgroupv1-devices /spark-rapids-cgroup/devices
sudo chmod a+rwx -R /spark-rapids-cgroup
sudo pip3 install numpy
- JSON Configuration:
The JSON configuration enables GPU-specific settings for Spark and YARN, ensuring optimal resource allocation and GPU utilization:
[
{
"Classification": "spark",
"Properties": {
"enableSparkRapids": "true"
}
},
{
"Classification": "yarn-site",
"Properties": {
"yarn.nodemanager.container-executor.class": "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor",
"yarn.nodemanager.linux-container-executor.cgroups.hierarchy": "yarn",
"yarn.nodemanager.linux-container-executor.cgroups.mount": "true",
"yarn.nodemanager.linux-container-executor.cgroups.mount-path": "/spark-rapids-cgroup",
"yarn.nodemanager.resource-plugins": "yarn.io/gpu",
"yarn.nodemanager.resource-plugins.gpu.allowed-gpu-devices": "auto",
"yarn.nodemanager.resource-plugins.gpu.path-to-discovery-executables": "/usr/bin",
"yarn.resource-types": "yarn.io/gpu"
}
},
{
"Classification": "container-executor",
"Configurations": [
{
"Classification": "gpu",
"Properties": {
"module.enabled": "true"
}
},
{
"Classification": "cgroups",
"Properties": {
"root": "/spark-rapids-cgroup",
"yarn-hierarchy": "yarn"
}
}
],
"Properties": {}
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.driver.memory": "8G",
"spark.dynamicAllocation.enabled": "false",
"spark.executor.cores": "15",
"spark.executor.extraLibraryPath": "/usr/local/cuda/targets/x86_64-linux/lib:/usr/local/cuda/extras/CUPTI/lib64:/usr/local/cuda/compat/lib:/usr/local/cuda/lib:/usr/local/cuda/lib64:/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native",
"spark.executor.instances": "12",
"spark.executor.memory": "30G",
"spark.executor.memoryOverhead": "10G",
"spark.executor.resource.gpu.amount": "1",
"spark.executor.resource.gpu.discoveryScript": "/usr/lib/spark/scripts/gpu/getGpusResources.sh",
"spark.network.timeout": "1600s",
"spark.plugins": "com.nvidia.spark.SQLPlugin",
"spark.rapids.memory.pinnedPool.size": "8G",
"spark.rapids.sql.concurrentGpuTasks": "3",
"spark.rapids.sql.enabled": "true",
"spark.sql.adaptive.enabled": "true",
"spark.submit.deployMode": "client",
"spark.submit.pyFiles": "/usr/lib/spark/jars/xgboost4j-spark_3.0-1.4.2-0.3.0.jar",
"spark.task.cpus": "1",
"spark.task.resource.gpu.amount": "0.0667"
}
},
{
"Classification": "capacity-scheduler",
"Properties": {
"yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
}
},
{
"Classification": "livy-conf",
"Properties": {
"livy.server.spark.deploy-mode=client": "client"
}
}
]
CPU Cluster Configuration
- Instances:
- Primary Node: M5.xlarge
- Core Nodes: 12 nodes of either R7i.4xlarge or R7a.4xlarge
- Bootstrap Script:
The bootstrap script installs necessary Python libraries for CPU-based processing:
#!/bin/bash
# Install pip if not already installed
sudo yum install -y python3-pip
# Upgrade pip
sudo python3 -m pip install --upgrade pip
# Install required Python libraries
sudo python3 -m pip install boto3 faker pyarrow pandas==1.3.5 numpy==1.21 mimesis farsante
- JSON Configuration:
The JSON configuration for CPU clusters focuses on memory and core allocation:
[
{
"Classification": "spark-defaults",
"Properties": {
"spark.driver.memory": "8G",
"spark.dynamicAllocation.enabled": "false",
"spark.executor.cores": "15",
"spark.executor.instances": "12",
"spark.executor.memory": "80G",
"spark.executor.memoryOverhead": "20G",
"spark.network.timeout": "1600s",
"spark.sql.adaptive.enabled": "true",
"spark.submit.deployMode": "client"
}
},
{
"Classification": "capacity-scheduler",
"Properties": {
"yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
}
},
{
"Classification": "livy-conf",
"Properties": {
"livy.server.spark.deploy-mode=client": "client"
}
}
]
Step-by-Step Breakdown of Data Processing and Feature Engineering for the Fraud Detection Pipeline
The Jupyter notebook in the repository outlines a scalable pipeline for fraud detection. Below are the detailed steps:
Step 1: Initialize Spark Session with GPU Optimizations
Configure a Spark session to leverage GPU acceleration. Critical settings include:
- Kryo Serialization: Reduces data shuffling overhead.
- Memory Allocation: 80GB executor memory for handling large datasets.
- Shuffle Partitions: 20,000 partitions to optimize parallel processing by reducing heavy spilling in the shuffle stage.
spark = SparkSession.builder \
.config("spark.executor.memory", "80G") \
.config("spark.sql.shuffle.partitions", "20000") \
.getOrCreate()
Step 2: Load and Prepare Data
Load the datasets (customers, terminals, transactions) from Amazon S3 and repartitioned for distributed processing:
- Transactions: 1,000 partitions to balance workload.
- Terminals: Broadcasted to all nodes for efficient joins.
customers_df = spark.read.parquet(customers_path).repartition(300)
terminals_df = spark.read.parquet(terminals_path)
transactions_df = spark.read.parquet(transactions_path).repartition(1000)
Step 3: Convert TX_DATETIME to Timestamp
Convert The TX_DATETIME column to a timestamp format for time-based feature extraction:
transactions_df = transactions_df.withColumn(
"TX_DATETIME", F.col("TX_DATETIME").cast("timestamp"))
Step 4: Extract Date Components
Split The TX_DATETIME column into year, month, and day for additional temporal features:
transactions_df = transactions_df.withColumn("yyyy", year(F.col("TX_DATETIME"))) \
.withColumn("mm", month(F.col("TX_DATETIME"))) \
.withColumn("dd", dayofmonth(F.col("TX_DATETIME")))
Step 5: Define Time Windows for Feature Engineering
Define time windows for rolling aggregations (e.g., 15 minutes, 1 day, 30 days):
time_windows = {
"15min": 15 * 60,
"30min": 30 * 60,
"60min": 60 * 60,
"1day": 24 * 60 * 60,
"7day": 7 * 24 * 60 * 60,
"15day": 15 * 24 * 60 * 60,
"30day": 30 * 24 * 60 * 60
Step 6: Add Window Features
Use the GPU-accelerated window functions to compute transaction counts and average amounts for each customer and terminal:
def add_window_features(transactions_df, time_windows, entity_id_col, prefix):
for window_name, window_duration in time_windows.items():
window_spec = Window.partitionBy(entity_id_col).orderBy(
F.col("TX_DATETIME").cast("long")).rangeBetween(-window_duration, 0)
transactions_df = transactions_df.withColumn(
f"{prefix}_nb_txns_{window_name}_window", F.count("*").over(window_spec))
transactions_df = transactions_df.withColumn(
f"{prefix}_avg_amt_{window_name}_window", F.avg("TX_AMOUNT").over(window_spec))
return transactions_df
transactions_df = add_window_features(transactions_df, time_windows, "CUSTOMER_ID", "customer_id")
transactions_df = add_window_features(transactions_df, time_windows, "TERMINAL_ID", "terminal_id")
Step 7: Categorical Encoding
Encoded string columns (e.g., CUSTOMER_ID, merchant) using StringIndexer for model compatibility:
customer_indexer = StringIndexer(inputCol="CUSTOMER_ID", outputCol="CUSTOMER_ID_index")
transactions_df = customer_indexer.transform(transactions_df)
Step 8: One-Hot Encoding for Fraud Labels
One-hot encode the TX_FRAUD column to prepare for binary classification:
transactions_df = transactions_df.withColumn("TX_FRAUD_0", (F.col("TX_FRAUD") == 0).cast("int"))
transactions_df = transactions_df.withColumn("TX_FRAUD_1", (F.col("TX_FRAUD") == 1).cast("int"))
Step 9: Join Datasets
Join the enriched transactions data with customer and terminal details:
final_df = transactions_df.join(customers_df, on="CUSTOMER_ID_index", how="left") \
.join(terminals_df, on="TERMINAL_ID_index", how="left")
Step 10: Save Final Dataset
Save the final dataset to S3 in Parquet format for downstream analytics:
final_df.write.mode("overwrite").parquet("s3://path/to/output/")
Performance Benchmarks and Cost Efficiency
As highlighted in the results below, the GPU-accelerated workflow on AWS delivers remarkable results:
Primary Instance Type | Primary Node Count | Primary Node Hourly Cost | Core Instance Type | Core Node Count | Core Node Hourly Cost | Run Time (Minutes) | Run Time (Hours) | Total Cost |
CPU (M5.xlarge) | 1 | $0.192 | GPU (G6.4xLarge) | 12 | $1.323 | 43 | 0.72 | $11.517 |
CPU (M5.xlarge) | 1 | $0.192 | CPU (R7i.4xLarge) |
12 | $1.058 | 450 | 7.50 | $96.660 |
CPU (M5.xlarge) | 1 | $0.192 | CPU (R7a.4xLarge) | 12 | $1.217 | 246 | 4.1 | $60.664 |
Important points:
We observed up to 10.5x speedup on a GPU based workflow process enabling real-time fraud alerts. Alongside, there was an 88% cost reduction on infrastructure costs because of reduced runtime and optimized resource usage.
Conclusion
The RAPIDS Accelerator for Apache Spark, integrated with AWS, transforms fraud detection pipelines by delivering unmatched performance and cost efficiency. By accelerating critical operations like window functions, joins, and encoding, financial institutions can process terabytes of transactional data in minutes rather than hours. It also enables faster feature engineering to enable experimenting with new features and identifying the best and most predictive features for the model training. The benchmarks underscore RAPIDS’ ability to reduce costs by up to 88% compared to CPU-based workflows, while its scalability ensures readiness for future data growth.
For organizations aiming to stay ahead of sophisticated fraud tactics, adopting GPU-accelerated workflows is no longer optional—it’s imperative. By combining the RAPIDS Accelerator for Apache Spark with AWS’s elastic infrastructure, businesses achieve real-time insights, minimize losses, and build a future-proof fraud detection system.
Join our sessions at NVIDIA GTC, taking place in San Jose, CA from March 17 to 21.
- Advancing Transaction Fraud Detection in Financial Services With NVIDIA RAPIDS on AWS
- Reduce Fraud and Financial Crime With Featurespace ARIC Risk Hub on AWS
Learn more about the NVIDIA Fraud Detection AI Workflow
Ready to supercharge your pipeline? Explore the RAPIDS Accelerator for Apache Spark and AWS GPU instances today:
- Accelerating Fraud Detection in Financial Services with NVIDIA RAPIDS on AWS: GitHub Repository which contains how to accelerate fraud detection workflows
- Integrate Spark RAPIDS: Accelerate data processing with the latest EMR version.
- Assess Your Workloads: Use NVIDIA’s Qualification Tool to identify opportunities.
- Explore Documentation: Installation, configuration, and advanced usage guides.
- Join the Community: Connect with Spark RAPIDS’ users.
[1] “Online Payment Fraud: Emerging Threats, Segment Analysis, & Market Forecasts 2023-2028,” Juniper Research, June 2023