How Cigna Tuned Its Spark Streaming App for Real-time Processing with Apache Kafka

How Cigna Tuned Its Spark Streaming App for Real-time Processing with Apache Kafka

Explore the configuration changes that Cigna’s Big Data Analytics team has made to optimize the performance of its real-time architecture.

Real-time stream processing with  as a backbone provides many benefits. For example, this architectural pattern can handle massive, organic data growth via the dynamic addition of streaming sources such as mobile devices, web servers, system logs, and wearable device data (aka, “Internet of Things”). Kafka can also help capture data in real-time and enable the proactive analysis of that data through Spark Streaming.

At Cigna Corporation, we have implemented such a real-time architecture based on Kafka and Apache Spark/Spark Streaming. As a result, we can capture many different types of events and react to them in real-time, and utilize machine-learning algorithms that constantly learn from new data as it arrives. We can also enrich data that arrives in batch with data that was just created in real-time.

This approach required significant tuning of our Spark Streaming application for optimal performance, however. In the remainder of this post, we’ll describe the details of that tuning. (Note: these results are specific to Cigna’s environment; your mileage may vary. But, they provide a good starting point for experimentation.)

Architecture Overview

As you may know, Spark Streaming enables the creation of real-time complex event processing architectures, and Kafka is a real-time, fault-tolerant, highly scalable data pipeline (or pub-sub system) for data streams. Spark Streaming can be configured to consume topics from Kafka, and create corresponding Kafka DStreams. Each DStream batches incoming messages into an abstraction called the RDD, which is an immutable collection of incoming messages. Each RDD is a micro-batch of the incoming messages, and the micro-batching window is configurable.

As illustrated below, most of the events we track originate from different types of portals (1). When users visit monitored pages or links, those events are captured and sent to Kafka through an Apache Flume agent (2). A Spark Streaming application then reads that data asynchronously and in parallel (3), with the Kafka events arriving in mini-batches (one-minute window).  The streaming application parses the semi-structured events, and then enriches them with other data from a large (125 million records) Apache Hive table (4). This table is read from Hive via Spark’s HiveContext and cached in memory. It also parses the rest of the event using the DataFrame API to build a structure around it. Once the record is built, it persists the DataFrame as a row in a Hive table. This same information can also be written back to a Kafka topic (5); while the physical layout of the files in the table is Apache Parquet, the rows are served up as a JSON response over HTTP to enterprise services. The enriched events can then become available to other applications or other systems in real time over Kafka.

This data is accessible through a RESTful API or JDBC/ODBC (6). For example, using the  for , we can make the data accessible as JSON over HTTP7mdash;a simple but effective low-level data service. Using dashboard creation tools like Looker and Tableau over Impala also helps users query and visualize live event data that has been enriched and processed in real time.

Summary of Optimizations

When we first deployed the entire solution, the Kafka and Flume components were performing well. But the Spark Streaming application was taking nearly 4-8 minutes, depending on resources allocated, to process a single batch. This latency was due to the use of DataFrames to enrich the data from the very large Hive table mentioned previously, and due to various undesirable configuration options.

To optimize our processing time, we started down two paths: First, to cache data and partition it where appropriate, and second to tune the Spark application via configuration changes. (We also packaged this application as a Cloudera Manager service by creating a Custom Service Descriptor and parcel, but that step is out of scope for this post.)

The spark-submit command we use to run the Spark app is shown below. It reflects all the options that, together with coding improvements, resulted in significantly less processing time: from 4-8 minutes to under 25 seconds.

/opt/app/dev/spark-1.5.2/bin/spark-submit \
 --jars  \
/opt/cloudera/parcels/CDH/jars/zkclient-0.3.jar,/opt/cloudera/parcels/CDH/jars/kafka_2.10-0.8.1.1.jar,/opt/app/dev/jars/datanucleus-core-3.2.2.jar,/opt/app/dev/jars/datanucleus-api-jdo-3.2.1.jar,/opt/app/dev/jars/datanucleus-rdbms-3.2.1.jar \
--files /opt/app/dev/spark-1.5.2/conf/hive-site.xml,/opt/app/dev/jars/log4j-eir.properties \
--queue spark_service_pool \
--master yarn \
--deploy-mode cluster \
--conf "spark.ui.showConsoleProgress=false" \
--conf "spark.driver.extraJavaOptions=-XX:MaxPermSize=6G -XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j-eir.properties" \
--conf "spark.sql.tungsten.enabled=false" \
--conf "spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory" \
--conf "spark.eventLog.enabled=true" \
--conf "spark.sql.codegen=false" \
--conf "spark.sql.unsafe.enabled=false" \
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j-eir.properties" \
--conf "spark.streaming.backpressure.enabled=true" \
--conf "spark.locality.wait=1s" \
--conf "spark.streaming.blockInterval=1500ms" \
--conf "spark.shuffle.consolidateFiles=true" \
--driver-memory 10G \
--executor-memory 8G \
--executor-cores 20 \
--num-executors 20 \
--class com.bigdata.streaming.OurApp \ /opt/app/dev/jars/OurStreamingApplication.jar external_props.conf

Next, we’ll describe the configuration changes and caching approach in detail.

Driver Options

Note that the driver is being run on the cluster and that we are running Spark on YARN. Because Spark Streaming apps are long running, the log files generated can be very large. To solve for this issue, we limited the number of messages written to the logs and used the RollingFileAppender to limit their maximum size. We also disabled console log messages by turning off the spark.ui.showConsoleProgress option.

Also, during testing, our driver frequently ran out of memory due to the permanent generation space filling up. (The permanent space is where the classes, methods, internalized strings, and similar objects used by the VM are stored and never de-allocated.) Increasing the permanent space to 6GB solved the problem:

spark.driver.extraJavaOptions=-XX:MaxPermSize=6G

Garbage Collection

Because our streaming application is a long-running process, after a period of processing time, we noticed long GC pauses that we wanted to either minimize or keep in the background. Adjusting the UseConcMarkSweepGC
parameter seemed to do the trick:

--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j-eir.properties" \

(The G1 collector is being considered for a future release.)

Disabling Tungsten

Tungsten is a major revamp of the Spark execution engine, and, as such, could prove to be problematic in its first release. Disabling Tungsten simplified our Spark SQL DAGs somewhat. We will re-evaluate Tungsten in the future when it is more hardened for version 1.6, especially if it optimizes shuffles.

We disabled the following flags:

spark.sql.tungsten.enabled=false
spark.sql.codegen=false
spark.sql.unsafe.enabled=false

Enabling Backpressure

Spark Streaming has trouble with situations where the batch-processing time is larger than the batch interval. In other words, Spark will not be able to read data from the topic faster than it arrives—the Kafka receiver for the executor won’t be able to keep up. If this throughput is sustained for long enough, it leads to an unstable situation where the memory of the receiver’s executor overflows.

Setting the following alleviated this issue:

spark.streaming.backpressure.enabled=true

Adjusting Locality and Blocks

These two options are complementary: One determines how long to wait for the data to be local to a task/executor, and the other is used by the Spark Streaming receivers to chunk data into blocks. The larger the data blocks the better, but if the data is not local to the executors, it will have to move over the network to wherever the task will be executed. We had to find a good balance between these two options because we don’t want the data blocks to be large, nor do we want to wait too long for locality. We want all the tasks in our application to finish within seconds.

Thus, we changed the locality option to 1 second from the default 3 seconds, thereby enabling one of the 20 executors to start after 1 second has passed.  We also changed the block interval to 1.5 seconds.

--conf "spark.locality.wait=1s" \
--conf "spark.streaming.blockInterval=1500ms" \

Consolidating Intermediate Files

Enabling this flag is recommended for ext4 filesystems because it results in fewer intermediate files, thereby improving filesystem performance for shuffles.

--conf "spark.shuffle.consolidateFiles=true" \

Enabling Executor Performance

While configuring a Kafka DStream, you can specify the number of parallel consumer threads. However, the consumers of a DStream will run on the same Spark Driver node. Thus, to do parallel consumption of a Kafka topic from multiple machines, you have to instantiate multiple DStreams. Although one approach would be to union the corresponding RDDs before processing, we found it cleaner to run multiple instances of the application and make them part of the same Kafka consumer group.

To do that, we enabled 20 executors and 20 cores per executor.

--executor-memory 8G
--executor-cores 20
--num-executors 20

We provided about 8GB memory to each executor to ensure that cached data remains in memory, and that there is enough room for the heap to shrink and grow. Caching reference datasets in memory helps tremendously when we run heavy DataFrame joins. This approach also speeds up processing of batches: 7,000 of them in 17-20 seconds, according to a recent benchmark.

Caching Approach

Cache the RDD before using it, but remember to remove it from cache to make room for the next batched iteration. Also, caching any data that is used multiple times, beyond the foreach loop, helps a lot. In our case, we cached the 125 million records in our Hive table as a DataFrame, partitioned that data, and used it in multiple joins. That change shaved nearly 4 minutes from total batch-processing time.

However, don’t make the number of partitions too large. Rather, keeping the number of partitions low will reduce the number of tasks and keep scheduling delays to a minimum. It will also ensure that larger chunks of data are processed with minimal delays. To confirm that the number of executors is proportional to the partitions we have, we simply kept the partitions at:

# of executors * # of cores = # of partitions

For instance, (20 * 20) = 400 partitions. Once the RDD is no longer needed in memory, rdd.unpersist() is called to swap it back out to disk.

(Note: The DataFrame API, although very effective, leaves a lot of processing to the underlying Spark system. In testing, we found that using the RDD API reduced processing time and excessive shuffling.)

Conclusion

Thanks to these changes, we now have a Spark Streaming app that is long running, uses resources responsibly, and can process real-time data within a few seconds. It provides:

  • Near real-time access to data + a view of history (batch) = all data
  • The ability to handle organic growth of data
  • The ability to proactively analyze data
  • Access to “continuous” data from many sources
  • The ability to detect an event when it actually occurs
  • The ability to combine events into patterns of behavior in real-time

As next steps, we are now looking at ways to further optimize our joins, simplify our DAGs, and reduce the number of shuffles that can occur.  We are also experimenting with Kafka Direct Streams, which may give us the ability to control data flow and implement redundancy and resilience through check-pointing.

While Cigna’s journey with Kafka and Spark Streaming is only beginning, we are excited about the doors it opens in the realm of real-time data analytics, and our quest to help people live healthier, happier lives.

Mohammad Quraishi (@AtifQ) is a Senior Principal Technologist at Cigna Corporation and has 20 years of experience in application architecture, design, and development. He has specific experience in mobile native applications, SOA platform implementation, web development, distributed applications, object-oriented analysis and design, requirements analysis, data modeling, and database design.

Jeff Shmain is a Senior Solutions Architect at Cloudera.

Read more

使用本地大模型调用代码,根本就是一场骗局!

使用本地大模型调用代码,根本就是一场骗局!

通过大模型调用其他工具到底可不可行?ChatGPT 或许能轻松搞定一切,但同样的需求落在本地大模型上,恐怕就要打个问号了。 法国开发工程师 Emilien Lancelot 尝试了多款号称具备工具调用功能的 agent 框架,来看看本地大模型到底能不能完成任务,但结果就像他总结的“一无所获”。是什么让这位工程师失望了? 用 AutoGPT,得会点“糊弄学” AutoGPT 是款貌似强大的框架,提供很酷的 CLI 外加 Flutter UI,能够通过浏览器创建 agent。其主要功能是处理用户的文档、音频、视频等本地内容。 但是……它主要依靠 ChatGPT 或其他专有大模型服务来完成繁重工作,至少给我们的感觉是如此。 我们必须“唬弄”AutoGPT 才能使用 Ollama 端点,让其误认为是 ChatGPT。 ## OPENAI_API_KEY - OpenAI API

By Ne0inhk
详细说明Encoder-only架构的特点以及他和encoder decoder的区别和联系

详细说明Encoder-only架构的特点以及他和encoder decoder的区别和联系

Poe Encoder-only 架构的特点 Encoder-only 模型只使用 Transformer 的编码器部分,专注于对输入序列的理解和表征学习。这类模型通常被用在需要对输入文本进行深度理解的任务上,如文本分类、问答和语义匹配。以下是 Encoder-only 架构的核心特点: 1. 双向上下文建模 * 特点:Encoder-only 模型通过自注意力机制(Self-Attention)同时关注输入序列的前后文。 * 优势:相比单向模型(如 Decoder-only),它可以更全面地捕捉输入序列的全局语义,适合需要理解复杂上下文的任务。 * 实现方式:在训练过程中,不对输入序列进行因果掩码(Causal Masking),允许模型在任何位置访问序列的所有位置。 * 例子:BERT 的 Masked Language Model(MLM)训练任务通过随机遮盖部分单词,依赖左侧和右侧的信息来预测被遮盖的词,即双向建模的典型体现。 2. 适用于理解任务 * 特点:Encoder-only 模型专注于理解输入序列,而不生成输出序列,因此适合处理分类、

By Ne0inhk
手把手教学,DeepSeek-R1微调全流程拆解

手把手教学,DeepSeek-R1微调全流程拆解

手把手教学,DeepSeek-R1微调全流程拆解 原创 极客见识  2025年02月09日 09:02 广东 DeepSeek 通过发布其开源推理模型 DeepSeek-R1 颠覆了 AI 格局,该模型使用创新的强化学习技术,以极低的成本提供与 OpenAI 的 o1 相当的性能。 更令人印象深刻的是,DeepSeek 已将其推理能力提炼成几个较小的模型。这篇文章,我们将使用其蒸馏版本之一引导大家完成 DeepSeek-R1 的整个微调过程。 本文章将演示了如何微调其中一个模型(使用我们自己的自定义思维链数据集),然后保存和部署微调后的模型。 高级推理模型微调 DeepSeek 简介 DeepSeek-R1 是由深度求索(DeepSeek)公司开发的突破性推理模型。DeepSeek-R1 基于 DeepSeek-V3-Base(总共 671B 个参数,每次推理 37B 处于活动状态)构建,使用强化学习 (RL) 在提供最终答案之前生成思路链

By Ne0inhk
LLM o1 中的蒙特卡洛树搜索算法,DeepSeek论文中提到

LLM o1 中的蒙特卡洛树搜索算法,DeepSeek论文中提到

LLM o1 中的蒙特卡洛树搜索算法,DeepSeek论文中提到 凌青羽  2025年02月09日 09:30 上海 蒙特卡洛树搜索算法的核心是:选择与模拟。 蒙特卡洛树搜索算法的主要目标是:给定一个游戏状态来选择最佳的下一步。 前言 在讲解蒙特卡罗树算法之前,我们先玩一个“赌博”游戏。多臂老虎机(Multi-Armed Bandits)。 多臂老虎机(Multi-Armed Bandits) 游戏规则如下:赌博机有K个摇臂,每次摇动其中的任意一个摇臂,赌博机都会随机吐出一些硬币。现在允许你摇T次,请问如何尝试使得收益最大化。(有限次数的尝试的收益最大化) 思考一下,你会如何尝试?(是下面的玩法中的哪一种类型呢?) * • 纯随机(Random):每次随机选一个摇臂进行摇动。 * • 劣势:能算个期望收益,但收益不是最大的。 * • 仅探索(Exploration-only):每个摇臂摇动T/K次。 * • 相当于每个摇臂摇动的次数都一样。(劣势:浪费次数在收益较差的摇臂上) * • 仅利用(Exploitation-only):

By Ne0inhk