news 2026/6/10 18:52:11

Kotlin协程flow缓冲buffer任务流,批次任务中选取优先级最高任务最先运行(一)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Kotlin协程flow缓冲buffer任务流,批次任务中选取优先级最高任务最先运行(一)

Kotlin协程flow缓冲buffer任务流,批次任务中选取优先级最高任务率先运行(一)

假设现在有一种场景,在一个任务接收器中,源源不断且不知道任务发送者何时会将新任务发送过来,每个任务都具备不同的任务优先级,任务无时无刻的进入任务缓冲池,目的是把任务缓冲池中优先级最高的那个任务挑选出来最先运行。

import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.newFixedThreadPoolContext import kotlinx.coroutines.runBlocking import java.util.UUID fun main() { val myThreadPool = newFixedThreadPoolContext(4, "my-thread") val bufferCapacity = 5 val totalTaskSize = 15 val channel = Channel<TaskInfo>() val taskList = mutableListOf<TaskInfo>() runBlocking { //接收任务 async { channel.receiveAsFlow() .buffer(bufferCapacity) .onEach { it -> //生产者 println("onEach $it at time=${System.currentTimeMillis()} ${Thread.currentThread().name}") taskList.add(it) }.flowOn(myThreadPool) .collect { it -> //消费者 println("collect $it at time=${System.currentTimeMillis()} ${Thread.currentThread().name}") val newOrderList = taskList.sortedBy { it.priority } newOrderList.forEach { print("${it.priority} ") } val lastTaskInfo = newOrderList.lastOrNull() println("\n最大优先级任务:$lastTaskInfo") taskList.remove(lastTaskInfo) loader(lastTaskInfo!!) } } //源源不断的密集发送加载任务。 async { repeat(totalTaskSize) { it -> enqueue(channel, it) } } } } private suspend fun enqueue(channel: Channel<TaskInfo>, id: Int) { val taskInfo = TaskInfo(id, (Math.random() * 9999).toInt()) println("enqueue $taskInfo") channel.send(taskInfo) } //假设这里是真正的耗时任务执行体 private suspend fun loader(info: TaskInfo) { println("load start $info @time=${System.currentTimeMillis()} ${Thread.currentThread().name}") delay(500) println("load end $info @time=${System.currentTimeMillis()} ${Thread.currentThread().name}") } private class TaskInfo { var id = 0 var priority = 0 private val taskId = UUID.randomUUID() constructor(id: Int, priority: Int) { this.id = id this.priority = priority } override fun equals(other: Any?): Boolean { return taskId == (other as TaskInfo).taskId } override fun toString(): String { return "TaskInfo(id=$id, priority=$priority)" } }

输出:

enqueue TaskInfo(id=0, priority=7947)
enqueue TaskInfo(id=1, priority=1045)
enqueue TaskInfo(id=2, priority=4478)
onEach TaskInfo(id=0, priority=7947) at time=1765979341859 my-thread-2
onEach TaskInfo(id=1, priority=1045) at time=1765979341859 my-thread-2
onEach TaskInfo(id=2, priority=4478) at time=1765979341859 my-thread-2
enqueue TaskInfo(id=3, priority=5964)
enqueue TaskInfo(id=4, priority=2658)
onEach TaskInfo(id=3, priority=5964) at time=1765979341859 my-thread-4
onEach TaskInfo(id=4, priority=2658) at time=1765979341859 my-thread-4
enqueue TaskInfo(id=5, priority=3495)
onEach TaskInfo(id=5, priority=3495) at time=1765979341860 my-thread-3
enqueue TaskInfo(id=6, priority=1461)
onEach TaskInfo(id=6, priority=1461) at time=1765979341860 my-thread-4
enqueue TaskInfo(id=7, priority=4860)
onEach TaskInfo(id=7, priority=4860) at time=1765979341860 my-thread-3
enqueue TaskInfo(id=8, priority=7226)
onEach TaskInfo(id=8, priority=7226) at time=1765979341860 my-thread-4
enqueue TaskInfo(id=9, priority=1939)
enqueue TaskInfo(id=10, priority=133)
onEach TaskInfo(id=9, priority=1939) at time=1765979341861 my-thread-3
onEach TaskInfo(id=10, priority=133) at time=1765979341861 my-thread-3
enqueue TaskInfo(id=11, priority=1818)
enqueue TaskInfo(id=12, priority=7695)
onEach TaskInfo(id=11, priority=1818) at time=1765979341861 my-thread-2
onEach TaskInfo(id=12, priority=7695) at time=1765979341861 my-thread-2
enqueue TaskInfo(id=13, priority=4365)
onEach TaskInfo(id=13, priority=4365) at time=1765979341862 my-thread-4
enqueue TaskInfo(id=14, priority=4889)
onEach TaskInfo(id=14, priority=4889) at time=1765979341862 my-thread-2
collect TaskInfo(id=0, priority=7947) at time=1765979341862 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889 5964 7226 7695 7947
最大优先级任务:TaskInfo(id=0, priority=7947)
load start TaskInfo(id=0, priority=7947) @time=1765979341887 main
load end TaskInfo(id=0, priority=7947) @time=1765979342391 main
collect TaskInfo(id=1, priority=1045) at time=1765979342392 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889 5964 7226 7695
最大优先级任务:TaskInfo(id=12, priority=7695)
load start TaskInfo(id=12, priority=7695) @time=1765979342392 main
load end TaskInfo(id=12, priority=7695) @time=1765979342901 main
collect TaskInfo(id=2, priority=4478) at time=1765979342901 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889 5964 7226
最大优先级任务:TaskInfo(id=8, priority=7226)
load start TaskInfo(id=8, priority=7226) @time=1765979342902 main
load end TaskInfo(id=8, priority=7226) @time=1765979343412 main
collect TaskInfo(id=3, priority=5964) at time=1765979343412 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889 5964
最大优先级任务:TaskInfo(id=3, priority=5964)
load start TaskInfo(id=3, priority=5964) @time=1765979343412 main
load end TaskInfo(id=3, priority=5964) @time=1765979343922 main
collect TaskInfo(id=4, priority=2658) at time=1765979343922 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889
最大优先级任务:TaskInfo(id=14, priority=4889)
load start TaskInfo(id=14, priority=4889) @time=1765979343923 main
load end TaskInfo(id=14, priority=4889) @time=1765979344433 main
collect TaskInfo(id=5, priority=3495) at time=1765979344433 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860
最大优先级任务:TaskInfo(id=7, priority=4860)
load start TaskInfo(id=7, priority=4860) @time=1765979344434 main
load end TaskInfo(id=7, priority=4860) @time=1765979344943 main
collect TaskInfo(id=6, priority=1461) at time=1765979344943 main
133 1045 1461 1818 1939 2658 3495 4365 4478
最大优先级任务:TaskInfo(id=2, priority=4478)
load start TaskInfo(id=2, priority=4478) @time=1765979344943 main
load end TaskInfo(id=2, priority=4478) @time=1765979345452 main
collect TaskInfo(id=7, priority=4860) at time=1765979345452 main
133 1045 1461 1818 1939 2658 3495 4365
最大优先级任务:TaskInfo(id=13, priority=4365)
load start TaskInfo(id=13, priority=4365) @time=1765979345452 main
load end TaskInfo(id=13, priority=4365) @time=1765979345960 main
collect TaskInfo(id=8, priority=7226) at time=1765979345960 main
133 1045 1461 1818 1939 2658 3495
最大优先级任务:TaskInfo(id=5, priority=3495)
load start TaskInfo(id=5, priority=3495) @time=1765979345960 main
load end TaskInfo(id=5, priority=3495) @time=1765979346467 main
collect TaskInfo(id=9, priority=1939) at time=1765979346467 main
133 1045 1461 1818 1939 2658
最大优先级任务:TaskInfo(id=4, priority=2658)
load start TaskInfo(id=4, priority=2658) @time=1765979346467 main
load end TaskInfo(id=4, priority=2658) @time=1765979346973 main
collect TaskInfo(id=10, priority=133) at time=1765979346973 main
133 1045 1461 1818 1939
最大优先级任务:TaskInfo(id=9, priority=1939)
load start TaskInfo(id=9, priority=1939) @time=1765979346974 main
load end TaskInfo(id=9, priority=1939) @time=1765979347482 main
collect TaskInfo(id=11, priority=1818) at time=1765979347482 main
133 1045 1461 1818
最大优先级任务:TaskInfo(id=11, priority=1818)
load start TaskInfo(id=11, priority=1818) @time=1765979347483 main
load end TaskInfo(id=11, priority=1818) @time=1765979347986 main
collect TaskInfo(id=12, priority=7695) at time=1765979347986 main
133 1045 1461
最大优先级任务:TaskInfo(id=6, priority=1461)
load start TaskInfo(id=6, priority=1461) @time=1765979347987 main
load end TaskInfo(id=6, priority=1461) @time=1765979348498 main
collect TaskInfo(id=13, priority=4365) at time=1765979348498 main
133 1045
最大优先级任务:TaskInfo(id=1, priority=1045)
load start TaskInfo(id=1, priority=1045) @time=1765979348498 main
load end TaskInfo(id=1, priority=1045) @time=1765979349006 main
collect TaskInfo(id=14, priority=4889) at time=1765979349006 main
133
最大优先级任务:TaskInfo(id=10, priority=133)
load start TaskInfo(id=10, priority=133) @time=1765979349007 main
load end TaskInfo(id=10, priority=133) @time=1765979349513 main

相关:

https://blog.csdn.net/zhangphil/article/details/154843029

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/8 15:58:09

大规模分布式系统性能优化的5大实战技巧

大规模分布式系统性能优化的5大实战技巧 【免费下载链接】apollo 项目地址: https://gitcode.com/gh_mirrors/ap/apollo 随着业务规模的快速扩张&#xff0c;分布式系统在支撑数万节点时常常面临性能瓶颈&#xff1a;响应延迟飙升、资源耗尽、系统稳定性下降。本文基于…

作者头像 李华
网站建设 2026/6/9 21:24:33

17、Linux文本文件操作全解析

Linux文本文件操作全解析 1. 文件类型检测 在脚本编程中, file 命令是检测文件类型的重要工具。它有许多实用的选项: - -b (brief)选项:隐藏文件名,只返回文件评估结果。例如: $ file -b orders.txt ASCII text-f (file)选项:从特定文件读取文件名。 -i …

作者头像 李华
网站建设 2026/6/10 13:59:08

PRQL现代化查询语言终极指南:从SQL复杂性到数据查询新体验

PRQL现代化查询语言终极指南&#xff1a;从SQL复杂性到数据查询新体验 【免费下载链接】prql PRQL/prql: 是一个类似于 SQL 的查询语言实现的库。适合用于查询各种数据库和数据格式。特点是支持多种数据库类型&#xff0c;提供了类似于 SQL 的查询语言。 项目地址: https://g…

作者头像 李华
网站建设 2026/6/10 13:35:18

EmotiVoice语音合成的艺术性探索

EmotiVoice语音合成的艺术性探索 在虚拟主播的一场直播中&#xff0c;观众弹幕突然刷起“心疼你”&#xff0c;镜头前的3D形象眼眶微红&#xff0c;声音也从欢快转为低沉&#xff1a;“是啊……我也觉得有点难过。”这句回应并非预录&#xff0c;而是由AI实时生成——语调中的颤…

作者头像 李华
网站建设 2026/6/10 1:04:29

工业场景实战案例--wifi联网

我是嵌入式学习菌&#xff0c;一名热爱学习的嵌入式工程师关注我&#xff0c;一起变得更加优秀&#xff01;嵌入式学习菌CSDN、B 站视频号同名同步分享嵌入式学习点滴&#xff5e; 无捷径唯有坚持&#xff0c;愿与你并肩稳步前行&#xff01;17篇原创内容公众号下面结合工业场景…

作者头像 李华
网站建设 2026/6/10 16:14:07

ATI显卡驱动下载与安装方法 新手必看指南

ATI 显卡&#xff08;现归属于 AMD&#xff09;因其良好的图形处理能力和性价比&#xff0c;被广泛应用于办公电脑、设计工作站及游戏设备中。显卡驱动作为连接硬件与系统的重要桥梁&#xff0c;若版本不匹配或安装异常&#xff0c;容易导致分辨率异常、画面卡顿甚至系统崩溃。…

作者头像 李华