
2023年11月06日
指令流水线技术被用于CPU架构中,以提高CPU在给定时钟速率下的吞吐量。同样的流水线概念也可以用于增加流处理系统的吞吐量。
流处理系统处理输入数据流以生成输出数据流。这些系统通常用于执行诸如数据转换、过滤、数据增强等操作。所有这些系统通常遵循相同的高级算法,从上游拉取事件,处理事件,并将处理后的事件推送到下游。
该算法可以在单线程执行模型中实现,其中一个线程从上游拉取事件,处理事件,最终将处理后的事件推送到下游。在这种实现中,执行线程在将所有处理后的事件推送到下游之前,无法从上游拉取更多事件。
单线程执行模型虽然简单易懂且易于实现,但并不能提供最大可能的吞吐量。假设从上游拉取1000个事件需要约250毫秒,处理这些事件需要约500毫秒,再花费约250毫秒将处理后的事件推送到下游。因此,我们系统的吞吐量约为每秒1000个事件。
假设我们要处理六个事件集,上述代码将大约需要六秒来执行端到端的算法。
如同CPU架构中定义的指令流水线是一种在单个处理器内实现指令级并行性的技术。对于流处理系统,相同的概念也适用。我们可以将高级算法分解为三个指令集,这三个指令集可以同时执行。
我们可以不再让一个线程执行整个算法,而是让三个线程分别处理一个指令集,并使用共享队列进行协调。
在任何时刻,这三个线程都将在工作,从而确保我们从该实现中获得最大的吞吐量。
以下代码让我们测量流水线实现的吞吐量:
public class MultiThreadedStreamProcessorTest { public static void main(String[] args) { MultiThreadedStreamProcessor multiThreadedStreamProcessor = new MultiThreadedStreamProcessor(); long startTime = System.currentTimeMillis(); // 添加输入 int totalEventSets = 6; for (int i = 0; i < totalEventSets; i++) { multiThreadedStreamProcessor.consumeAnEventSet("set" + (i+1)); } // 等待直到所有工作完成 while (true) { if (multiThreadedStreamProcessor.getOutputEventsQueue().size() == totalEventSets) { System.out.println("所有工作已完成"); break; } } long endTime = System.currentTimeMillis(); System.out.println("耗时:" + (endTime-startTime) + "毫秒" ); } }
输出:
拉取 set1 拉取 set2 处理 set1 拉取 set3 处理 set2 发布 set1 拉取 set4 拉取 set5 处理 set3 发布 set2 拉取 set6 处理 set4 发布 set3 处理 set5 发布 set4 发布 set5 处理 set6 发布 set6 所有工作已完成 耗时:3545毫秒
上述示例显示,流水线实现提供了大约2倍的吞吐量,即3.5秒,相比非流水线实现的六秒。
本文链接:https://www.iokks.com/art/a2e1c99dfa90
本博客所有文章除特别声明外,均采用CC BY 4.0 CN协议 许可协议。转载请注明出处!