logo

流水线(Pipelining)增加流处理系统的吞吐量

224
2023年11月29日
流水线技术通过使用共享队列来协调独立线程之间的并行性,提高了流处理的吞吐量。

指令流水线技术被用于CPU架构中,以提高CPU在给定时钟速率下的吞吐量。同样的流水线概念也可以用于增加流处理系统的吞吐量。

流处理系统

流处理系统处理输入数据流以生成输出数据流。这些系统通常用于执行诸如数据转换、过滤、数据增强等操作。所有这些系统通常遵循相同的高级算法,从上游拉取事件,处理事件,并将处理后的事件推送到下游。

无流水线

该算法可以在单线程执行模型中实现,其中一个线程从上游拉取事件,处理事件,最终将处理后的事件推送到下游。在这种实现中,执行线程在将所有处理后的事件推送到下游之前,无法从上游拉取更多事件。

单线程执行模型虽然简单易懂且易于实现,但并不能提供最大可能的吞吐量。假设从上游拉取1000个事件需要约250毫秒,处理这些事件需要约500毫秒,再花费约250毫秒将处理后的事件推送到下游。因此,我们系统的吞吐量约为每秒1000个事件。

假设我们要处理六个事件集,上述代码将大约需要六秒来执行端到端的算法。

使用流水线

如同CPU架构中定义的指令流水线是一种在单个处理器内实现指令级并行性的技术。对于流处理系统,相同的概念也适用。我们可以将高级算法分解为三个指令集,这三个指令集可以同时执行。

我们可以不再让一个线程执行整个算法,而是让三个线程分别处理一个指令集,并使用共享队列进行协调。

  • 事件拉取线程: 该线程负责从上游拉取事件并将其写入本地内存队列,我们称之为“未处理事件队列”。
  • 事件处理器线程: 该线程负责从“未处理事件队列”中拉取事件,处理事件,并将处理后的事件写入另一个本地内存队列,我们称之为“已处理事件队列”。
  • 事件发布线程: 该线程负责从“已处理事件队列”中拉取事件并将其推送到下游。

在任何时刻,这三个线程都将在工作,从而确保我们从该实现中获得最大的吞吐量。

以下代码让我们测量流水线实现的吞吐量:

public class MultiThreadedStreamProcessorTest {

    public static void main(String[] args) {
        MultiThreadedStreamProcessor multiThreadedStreamProcessor = new MultiThreadedStreamProcessor();

        long startTime = System.currentTimeMillis();

        // 添加输入
        int totalEventSets = 6;
        for (int i = 0; i < totalEventSets; i++) {
            multiThreadedStreamProcessor.consumeAnEventSet("set" + (i+1));
        }

        // 等待直到所有工作完成
        while (true) {
            if (multiThreadedStreamProcessor.getOutputEventsQueue().size() == totalEventSets) {
                System.out.println("所有工作已完成");
                break;
            }
        }

        long endTime = System.currentTimeMillis();

        System.out.println("耗时:" + (endTime-startTime) + "毫秒" );
    }
}

输出:

拉取 set1
拉取 set2
处理 set1
拉取 set3
处理 set2
发布 set1
拉取 set4
拉取 set5
处理 set3
发布 set2
拉取 set6
处理 set4
发布 set3
处理 set5
发布 set4
发布 set5
处理 set6
发布 set6
所有工作已完成
耗时:3545毫秒

上述示例显示,流水线实现提供了大约2倍的吞吐量,即3.5秒,相比非流水线实现的六秒。

本文链接:https://www.iokks.com/art/a2e1c99dfa90
本博客所有文章除特别声明外,均采用CC BY 4.0 CN协议 许可协议。转载请注明出处!