AWS for Industries

Accelerating Fraud Detection in Financial Services with NVIDIA RAPIDS™ Accelerator for Apache Spark on AWS

Transaction fraud poses a growing challenge that financial institutions struggle to combat effectively. Recent estimates forecast that payment fraud will exceed $326 billion for the period between 2023 and 2028, with remote purchases of physical goods accounting for over 47 percent of fraud losses¹. Legacy CPU-based systems rely on sequential processing, leading to increased operational costs and inefficiencies. By leveraging NVIDIA AI, accelerated computing and parallel processing in fraud prevention efforts, AWS has developed a powerful approach to solve this problem. We integrate the NVIDIA Fraud Detection AI Workflow using NVIDIA RAPIDS™ with graph neural network (GNN) embeddings for acceleration and accuracy. In this blog, we will focus on data processing as a part of this workflow.

The RAPIDS Accelerator for Apache Spark is a plug-in that leverages RAPIDS libraries and GPUs to accelerate data processing and machine learning pipelines on Apache Spark. It transforms existing pipelines without any code change. This tool enables processing of enormous volumes of sensitive financial data at unprecedented speeds, all while maintaining commitment to regulatory compliance and data security. The RAPIDS Accelerator enhances fraud detection, risk assessment, and portfolio optimization.

This blog shows a GPU-accelerated fraud detection workflow using the RAPIDS Accelerator for Apache Spark and Apache Spark on AWS. The first part of the workflow focuses on data processing, where we showcase improvements in processing speed and cost efficiency compared to CPU-based alternatives.

Amazon EMR Clusters with NVIDIA GPUs: Boosting Speed and Efficiency

To leverage the RAPIDS Accelerator for Apache Spark for fraud detection, you must configure the Amazon EMR cluster with GPU-enabled instances and specific settings to optimize performance. Below are the details for setting up both GPU and CPU clusters:

GPU Cluster Configuration

Below is an example.

  • Bootstrap Script:
    The bootstrap script sets up the environment for GPU acceleration, including mounting cgroups and installing necessary libraries:
#!/bin/bash
set -ex

sudo mkdir -p /spark-rapids-cgroup/devices
sudo mount -t cgroup -o devices cgroupv1-devices /spark-rapids-cgroup/devices
sudo chmod a+rwx -R /spark-rapids-cgroup
sudo pip3 install numpy
  • JSON Configuration:
    The JSON configuration enables GPU-specific settings for Spark and YARN, ensuring optimal resource allocation and GPU utilization:
[
  {
    "Classification": "spark",
    "Properties": {
      "enableSparkRapids": "true"
    }
  },
  {
    "Classification": "yarn-site",
    "Properties": {
      "yarn.nodemanager.container-executor.class": "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor",
      "yarn.nodemanager.linux-container-executor.cgroups.hierarchy": "yarn",
      "yarn.nodemanager.linux-container-executor.cgroups.mount": "true",
      "yarn.nodemanager.linux-container-executor.cgroups.mount-path": "/spark-rapids-cgroup",
      "yarn.nodemanager.resource-plugins": "yarn.io/gpu",
      "yarn.nodemanager.resource-plugins.gpu.allowed-gpu-devices": "auto",
      "yarn.nodemanager.resource-plugins.gpu.path-to-discovery-executables": "/usr/bin",
      "yarn.resource-types": "yarn.io/gpu"
    }
  },
  {
    "Classification": "container-executor",
    "Configurations": [
      {
        "Classification": "gpu",
        "Properties": {
          "module.enabled": "true"
        }
      },
      {
        "Classification": "cgroups",
        "Properties": {
          "root": "/spark-rapids-cgroup",
          "yarn-hierarchy": "yarn"
        }
      }
    ],
    "Properties": {}
  },
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.driver.memory": "8G",
      "spark.dynamicAllocation.enabled": "false",
      "spark.executor.cores": "15",
      "spark.executor.extraLibraryPath": "/usr/local/cuda/targets/x86_64-linux/lib:/usr/local/cuda/extras/CUPTI/lib64:/usr/local/cuda/compat/lib:/usr/local/cuda/lib:/usr/local/cuda/lib64:/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native",
      "spark.executor.instances": "12",
      "spark.executor.memory": "30G",
      "spark.executor.memoryOverhead": "10G",
      "spark.executor.resource.gpu.amount": "1",
      "spark.executor.resource.gpu.discoveryScript": "/usr/lib/spark/scripts/gpu/getGpusResources.sh",
      "spark.network.timeout": "1600s",
      "spark.plugins": "com.nvidia.spark.SQLPlugin",
      "spark.rapids.memory.pinnedPool.size": "8G",
      "spark.rapids.sql.concurrentGpuTasks": "3",
      "spark.rapids.sql.enabled": "true",
      "spark.sql.adaptive.enabled": "true",
      "spark.submit.deployMode": "client",
      "spark.submit.pyFiles": "/usr/lib/spark/jars/xgboost4j-spark_3.0-1.4.2-0.3.0.jar",
      "spark.task.cpus": "1",
      "spark.task.resource.gpu.amount": "0.0667"
    }
  },
  {
    "Classification": "capacity-scheduler",
    "Properties": {
      "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
    }
  },
  {
    "Classification": "livy-conf",
    "Properties": {
      "livy.server.spark.deploy-mode=client": "client"
    }
  }
]

CPU Cluster Configuration

  • Instances:
    • Primary Node: M5.xlarge
    • Core Nodes: 12 nodes of either R7i.4xlarge or R7a.4xlarge
  • Bootstrap Script:
    The bootstrap script installs necessary Python libraries for CPU-based processing:
#!/bin/bash

# Install pip if not already installed
sudo yum install -y python3-pip

# Upgrade pip
sudo python3 -m pip install --upgrade pip

# Install required Python libraries
sudo python3 -m pip install boto3 faker pyarrow pandas==1.3.5 numpy==1.21 mimesis farsante
  • JSON Configuration:
    The JSON configuration for CPU clusters focuses on memory and core allocation:
[
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.driver.memory": "8G",
      "spark.dynamicAllocation.enabled": "false",
      "spark.executor.cores": "15",
      "spark.executor.instances": "12",
      "spark.executor.memory": "80G",
      "spark.executor.memoryOverhead": "20G",
      "spark.network.timeout": "1600s",
      "spark.sql.adaptive.enabled": "true",
      "spark.submit.deployMode": "client"
    }
  },
  {
    "Classification": "capacity-scheduler",
    "Properties": {
      "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
    }
  },
  {
    "Classification": "livy-conf",
    "Properties": {
      "livy.server.spark.deploy-mode=client": "client"
    }
  }
]

Step-by-Step Breakdown of Data Processing and Feature Engineering for the Fraud Detection Pipeline

The Jupyter notebook in the repository outlines a scalable pipeline for fraud detection. Below are the detailed steps:

Step 1: Initialize Spark Session with GPU Optimizations

Configure a Spark session to leverage GPU acceleration. Critical settings include:

  • Kryo Serialization: Reduces data shuffling overhead.
  • Memory Allocation: 80GB executor memory for handling large datasets.
  • Shuffle Partitions: 20,000 partitions to optimize parallel processing by reducing heavy spilling in the shuffle stage.
spark = SparkSession.builder \
    .config("spark.executor.memory", "80G") \
    .config("spark.sql.shuffle.partitions", "20000") \
    .getOrCreate()

Step 2: Load and Prepare Data

Load the datasets (customers, terminals, transactions) from Amazon S3 and repartitioned for distributed processing:

  • Transactions: 1,000 partitions to balance workload.
  • Terminals: Broadcasted to all nodes for efficient joins.
customers_df = spark.read.parquet(customers_path).repartition(300)
terminals_df = spark.read.parquet(terminals_path)
transactions_df = spark.read.parquet(transactions_path).repartition(1000)

Step 3: Convert TX_DATETIME to Timestamp

Convert The TX_DATETIME column to a timestamp format for time-based feature extraction:

transactions_df = transactions_df.withColumn(
    "TX_DATETIME", F.col("TX_DATETIME").cast("timestamp"))

Step 4: Extract Date Components

Split The TX_DATETIME column into year, month, and day for additional temporal features:

transactions_df = transactions_df.withColumn("yyyy", year(F.col("TX_DATETIME"))) \
                                 .withColumn("mm", month(F.col("TX_DATETIME"))) \
                                 .withColumn("dd", dayofmonth(F.col("TX_DATETIME")))

Step 5: Define Time Windows for Feature Engineering

Define time windows for rolling aggregations (e.g., 15 minutes, 1 day, 30 days):

time_windows = {
    "15min": 15 * 60,
    "30min": 30 * 60,
    "60min": 60 * 60,
    "1day": 24 * 60 * 60,
    "7day": 7 * 24 * 60 * 60,
    "15day": 15 * 24 * 60 * 60,
    "30day": 30 * 24 * 60 * 60

Step 6: Add Window Features

Use the GPU-accelerated window functions to compute transaction counts and average amounts for each customer and terminal:

def add_window_features(transactions_df, time_windows, entity_id_col, prefix):
    for window_name, window_duration in time_windows.items():
        window_spec = Window.partitionBy(entity_id_col).orderBy(
            F.col("TX_DATETIME").cast("long")).rangeBetween(-window_duration, 0)
        transactions_df = transactions_df.withColumn(
            f"{prefix}_nb_txns_{window_name}_window", F.count("*").over(window_spec))
        transactions_df = transactions_df.withColumn(
            f"{prefix}_avg_amt_{window_name}_window", F.avg("TX_AMOUNT").over(window_spec))
    return transactions_df

transactions_df = add_window_features(transactions_df, time_windows, "CUSTOMER_ID", "customer_id")
transactions_df = add_window_features(transactions_df, time_windows, "TERMINAL_ID", "terminal_id")

Step 7: Categorical Encoding

Encoded string columns (e.g., CUSTOMER_ID, merchant) using StringIndexer for model compatibility:

customer_indexer = StringIndexer(inputCol="CUSTOMER_ID", outputCol="CUSTOMER_ID_index")
transactions_df = customer_indexer.transform(transactions_df)

Step 8: One-Hot Encoding for Fraud Labels

One-hot encode the TX_FRAUD column to prepare for binary classification:

transactions_df = transactions_df.withColumn("TX_FRAUD_0", (F.col("TX_FRAUD") == 0).cast("int"))
transactions_df = transactions_df.withColumn("TX_FRAUD_1", (F.col("TX_FRAUD") == 1).cast("int"))

Step 9: Join Datasets

Join the enriched transactions data with customer and terminal details:

final_df = transactions_df.join(customers_df, on="CUSTOMER_ID_index", how="left") \
                          .join(terminals_df, on="TERMINAL_ID_index", how="left")

Step 10: Save Final Dataset

Save the final dataset to S3 in Parquet format for downstream analytics:

final_df.write.mode("overwrite").parquet("s3://path/to/output/")

Performance Benchmarks and Cost Efficiency

As highlighted in the results below, the GPU-accelerated workflow on AWS delivers remarkable results:

Primary Instance Type Primary Node Count Primary Node Hourly Cost Core Instance Type Core Node Count Core Node Hourly Cost Run Time (Minutes) Run Time (Hours) Total Cost
CPU (M5.xlarge) 1 $0.192 GPU (G6.4xLarge) 12 $1.323 43 0.72 $11.517
CPU (M5.xlarge) 1 $0.192

CPU

(R7i.4xLarge)

12 $1.058 450 7.50 $96.660
CPU (M5.xlarge) 1 $0.192 CPU (R7a.4xLarge) 12 $1.217 246 4.1 $60.664

Important points:

We observed up to 10.5x speedup on a GPU based workflow process enabling real-time fraud alerts. Alongside, there was an 88% cost reduction on infrastructure costs because of reduced runtime and optimized resource usage.

Cost saving: 88%

Speedup: 10.5X

Conclusion

The RAPIDS Accelerator for Apache Spark, integrated with AWS, transforms fraud detection pipelines by delivering unmatched performance and cost efficiency. By accelerating critical operations like window functions, joins, and encoding, financial institutions can process terabytes of transactional data in minutes rather than hours. It also enables faster feature engineering to enable experimenting with new features and identifying the best and most predictive features for the model training. The benchmarks underscore RAPIDS’ ability to reduce costs by up to 88% compared to CPU-based workflows, while its scalability ensures readiness for future data growth.

For organizations aiming to stay ahead of sophisticated fraud tactics, adopting GPU-accelerated workflows is no longer optional—it’s imperative. By combining the RAPIDS Accelerator for Apache Spark with AWS’s elastic infrastructure, businesses achieve real-time insights, minimize losses, and build a future-proof fraud detection system.

Join our sessions at NVIDIA GTC, taking place in San Jose, CA from March 17 to 21.

  • Advancing Transaction Fraud Detection in Financial Services With NVIDIA RAPIDS on AWS
  • Reduce Fraud and Financial Crime With Featurespace ARIC Risk Hub on AWS

Learn more about the NVIDIA Fraud Detection AI Workflow

Ready to supercharge your pipeline? Explore the RAPIDS Accelerator for Apache Spark and AWS GPU instances today:

[1] “Online Payment Fraud: Emerging Threats, Segment Analysis, & Market Forecasts 2023-2028,” Juniper Research, June 2023

Dr. Angelos Chionis

Dr. Angelos Chionis

Dr. Angelos Chionis is a Senior Analytics Solutions Architect at Amazon Web Services (AWS), specializing in helping enterprise customers industrialize their data to build scalable analytics and machine learning (ML) applications. With a PhD in Computer Engineering, he has also been a guest lecturer in applied mathematics and computer engineering at various universities. With over 15 years of experience, Angelos has been at the forefront of inventing, designing, and delivering end-to-end, production-level analytics and ML solutions across industries such as energy, retail, healthcare, finance, and media. His expertise spans strategy, architecture, and hands-on implementation, enabling organizations to unlock the full potential of their data. Beyond his professional work, Angelos enjoys spending time with family and friends and pursuing his passion for collecting and working on cars.

Amar Yashlala

Amar Yashlala

Amar is a Senior Analytics Solutions Architect at AWS specializing in building modern data analytics for large enterprises. passionate about helping organizations leverage cloud-native analytics solutions to drive business value. He is passionate about photography and loves to travel and take photographs, when he is not working.

Bailey Thompson

Bailey Thompson

Bailey is a Partner Solutions Architect specializing in prototyping solutions across various industries. With a focus on designing and validating new architectures, she helps partners accelerate their cloud adoption and bring complex AI/ML driven solutions to life. In her free time, she loves traveling to new places and seeking out the perfect cup of coffee.

Hao Zhu

Hao Zhu

Hao Zhu is senior manager, accelerated Spark applications at NVIDIA. Hao and his team mainly cover customer engagement and application development for RAPIDS Accelerator for Apache Spark. Hao is experienced in Hadoop, database, massively parallel processing query engines, etc.

Roman Yokunda Enzmann

Roman Yokunda Enzmann

Roman is a Solutions Architect at NVIDIA specializing on accelerated data science. He specializes in helping customers deploy AI/ML based solutions. His focus is on accelerating end-to-end data science pipelines leveraging RAPIDS and RAPIDS Accelerator for Apache Spark based solutions. Roman has a background in explainable AI and automizing credit risk solutions.