PriorityQueue

概念 优先级队列(Priority Queue)是一种特殊的队列,队列中每个元素都附有一个"优先级"。出队时,不再遵循普通队列的 FIFO(先进先出)原则,而是优先级最高(或最低)的元素先出队。 直观类比:医院的急诊室——无论谁先到,病情最重的患者优先就诊。 核心特性 插入(insert / enqueue):将元素连同优先级一起加入队列 取最值(peek):查看优先级最高的元素,不移除 取出最值(poll / dequeue):取出并移除优先级最高的元素 不保证全局有序:内部结构只保证堆顶(最值)正确,其余元素顺序不确定 内部实现:二叉堆 绝大多数语言的标准库都用**二叉堆(Binary Heap)**实现优先级队列。 堆的结构 用完全二叉树表示,通常以数组存储 最小堆(Min-Heap):父节点 ≤ 子节点,根为最小值 最大堆(Max-Heap):父节点 ≥ 子节点,根为最大值 数组索引关系(从 0 开始): 节点 索引 父节点 (i-1) / 2 左子节点 2*i + 1 右子节点 2*i + 2 核心操作:上浮与下沉 上浮(Sift Up):插入新元素时,将其放在数组末尾,然后与父节点比较,不满足堆性质则交换,直到满足为止。 下沉(Sift Down):取出堆顶后,将末尾元素移到堆顶,然后与子节点比较,不满足堆性质则与较小(或较大)的子节点交换,直到满足为止。 时间复杂度 操作 时间复杂度 插入 $O(\log n)$ 取出最值 $O(\log n)$ 查看最值 $O(1)$ 建堆(heapify) $O(n)$ 两种变体 最小优先级队列(Min Priority Queue) 优先级数值最小的元素最先出队。例如:Dijkstra 最短路径算法中,每次选取距离最小的节点。 最大优先级队列(Max Priority Queue) 优先级数值最大的元素最先出队。例如:任务调度中,优先执行优先级最高的任务。 ...

2026-03-20 · 1 min · 125 words · -

延时队列

“延时队列” https://zhuanlan.zhihu.com/p/266156267 延迟队列 首先,队列这种数据结构相信大家都不陌生,它是一种先进先出的数据结构。普通队列中的元素是有序的,先进入队列中的元素会被优先取出进行消费; 延时队列相比于普通队列最大的区别就体现在其延时的属性上, 普通队列的元素是先进先出,按入队顺序进行处理, 而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理。从某种意义上来讲,延迟队列的结构并不像一个队列,而更像是一种以时间为权重的有序堆结构。 应用场景 我在开发业务需求时遇到的使用场景是这样的,用户可以在小程序中订阅不同的微信或者 QQ 的模板消息,产品同学可以在小程序的管理端新建消息推送计划,当到达指定的时间节点的时候给所有订阅模板消息的用户进行消息推送。 这里不太懂…. 把要发送的模板消息加到延时队列里? 如果仅仅是服务单一的小程序,那也许起个定时任务,或者甚至人工的定时去执行能够最便捷最快速的去完成这项需求,但我们希望能够抽象出一个消息订阅的模块服务出来给所有业务使用,这时候就需要一种通用的系统的解决方案,这时候便需要使用到延迟队列了。 除了上述我所遇到的这样的典型的需求以外,延迟队列的应用场景其实也非常的广泛,比如说以下的场景: 新建的订单,如果用户在 15 分钟内未支付,则自动取消。 公司的会议预定系统,在会议预定成功后,会在会议开始前半小时通知所有预定该会议的用户。 安全工单超过 24 小时未处理,则自动拉企业微信群提醒相关责任人。 用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时。 对于数据量比较少并且时效性要求不那么高的场景,一种比较简单的方式是轮询数据库,比如每秒轮询一下数据库中所有数据,处理所有到期的数据,比如如果我是公司内部的会议预定系统的开发者,我可能就会采用这种方案,因为整个系统的数据量必然不会很大并且会议开始前提前 30 分钟提醒与提前 29 分钟提醒的差别并不大。 但是如果需要处理的数据量比较大实时性要求比较高,比如淘宝每天的所有新建订单 15 分钟内未支付的自动超时,数量级高达百万甚至千万,这时候如果你还敢轮询数据库怕是要被你老板打死,不被老板打死估计也要被运维同学打死。 按时间戳排序,每秒查询15分钟之内创建的订单? 这种场景下,就需要使用到我们今天的主角 —— 延迟队列了。延迟队列为我们提供了一种高效的处理大量需要延迟消费消息的解决方案。那么话不多说,下面我们就来看一下几种常见的延迟队列的解决方案以及他们各自的优缺点。 实现方案 Redis ZSet 我们知道 Redis 有一个有序集合的数据结构 ZSet,ZSet 中每个元素都有一个对应 Score,ZSet 中所有元素是按照其 Score 进行排序的。 那么我们可以通过以下这几个操作使用 Redis 的 ZSet 来实现一个延迟队列: 入队操作: ZADD KEY timestamp task, 我们将需要处理的任务,按其需要延迟处理时间作为 Score 加入到 ZSet 中。Redis 的 ZAdd 的时间复杂度是O(logN),N是 ZSet 中元素个数,因此我们能相对比较高效的进行入队操作。 起一个进程定时 (比如每隔一秒) 通过 ZRANGEBYSCORE 方法查询 ZSet 中 Score 最小的元素,具体操作为: ZRANGEBYSCORE KEY -inf +inf limit 0 1 WITHSCORES ...

2021-07-21 · 2 min · 248 words · -

LinkedBlockingQueue

LinkedBlockingQueue 基于链表的阻塞队列,同 ArrayBlockingQueue 类似,其内部也维持着一个数据缓冲队列 (该队列是一个链表) ,生产者存入的数据会缓存在队列内部,生产者立即返回;只有当队列缓冲区达到最大值缓存容量时 (LinkedBlockingQueue 可以通过构造函数指定该值) ,才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端 分别采用了独立的锁 来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。 作为开发者,我们需要注意的是,如果构造一个 LinkedBlockingQueue 对象,而没有指定其容量大小,LinkedBlockingQueue 会默认一个类似无限大小的容量 (Integer.MAX_VALUE) ,这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。 此队列按 FIFO (先进先出) 排序元素。队列的头部 是在队列中时间最长的元素。队列的尾部 是在队列中时间最短的元素。 新元素插入到队列的尾部,并且队列检索操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列, 但是在大多数并发应用程序中,其可预知的性能要低。 可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。 如果未指定容量,默认容量为 Integer.MAX_VALUE ,容量范围可以在构造方法参数中指定作为防止队列过度扩展。 此对象是线程阻塞-线程安全的 不接受 null 元素 它实现了BlockingQueue接口。 实现了 Collection 和 Iterator 接口的所有可选方法。 在JDK5/6中,LinkedBlockingQueue和ArrayBlocingQueue等对象的poll(long timeout, TimeUnit unit)存在内存泄露Leak的对象AbstractQueuedSynchronizer.Node,据称JDK5会在Update12里Fix,JDK6会在Update2里Fix 下面介绍几种常用的方法: 定义一个输出方法: static void v(Object s){System.out.println(s.toString());} LinkedBlockingQueue bq=new LinkedBlockingQueue(); for(int i=0;i<100;i++) { bq.add(“i”+i);//如果空间已满,此方法会抛出异常,所以这就是put,或者offer方法的优势所在 } String s1=bq.take();//i0 String s2=bq.take();//i1 bq.offer(“ix”,5,TimeUnit.SECONDS);//在尾部插入一个元素,如果有必要 ,等待 指定的时间,使得队列变得可用。返回boolean值 表示是否插入成功。 ...

2015-09-14 · 2 min · 426 words · -

LinkedTransferQueue

LinkedTransferQueue LinkedTransferQueue是在JDK1.7时,J.U.C包新增的一种比较特殊的阻塞队列,它除了具备阻塞队列的常用功能外,还有一个比较特殊的transfer方法。 我们知道,在普通阻塞队列中,当队列为空时,消费者线程(调用take或poll方法的线程)一般会阻塞等待生产者线程往队列中存入元素。而LinkedTransferQueue的transfer方法则比较特殊: 当有消费者线程阻塞等待时,调用transfer方法的生产者线程不会将元素存入队列,而是直接将元素传递给消费者; 如果调用transfer方法的生产者线程发现没有正在等待的消费者线程,则会将元素入队,然后会阻塞等待,直到有一个消费者线程来获取该元素。 有一篇论文讨论了其算法与性能: 地址: http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf LinkedTransferQueue 实现了一个重要的接口 TransferQueue, 该接口含有下面几个重要方法: transfer(E e) 若当前存在一个正在等待获取的消费者线程,即立刻移交之;否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,到有消费者线程取走该元素。 tryTransfer(E e) 若当前存在一个正在等待获取的消费者线程 (使用take()或者poll()函数) ,使用该方法会即刻转移/传输对象元素e; 若不存在,则返回false,并且不进入队列。这是一个不阻塞的操作。 tryTransfer(E e, long timeout, TimeUnit unit) 若当前存在一个正在等待获取的消费者线程,会立即传输给它; 否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉, 若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素被移除。 hasWaitingConsumer() 判断是否存在消费者线程 getWaitingConsumerCount() 获取所有等待获取元素的消费线程数量 其实transfer方法在SynchronousQueue的实现中就已存在了,只是没有做为API暴露出来。SynchronousQueue有一个特性:它本身不存在容量,只能进行线程之间的 元素传送。SynchronousQueue在执行offer操作时,如果没有其他线程执行poll,则直接返回false.线程之间元素传送正是通过transfer方法完成的。 有一个使用案例,我们知道ThreadPoolExecutor调节线程的原则是: 先调整到最小线程,最小线程用完后,他会将优先将任务放入缓存队列(offer(task)),等缓冲队列用完了,才会向最大线程数调节。这似乎与我们所理解的线程池模型有点不同。我们一般采用增加到最大线程后,才会放入缓冲队列中,以达到最大性能。ThreadPoolExecutor代码片段: public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) ...

2015-09-14 · 1 min · 160 words · -

消息队列/message queue/MQ, CORBA, DCOM, RMI, RPC

消息队列/message queue/MQ, CORBA, DCOM, RMI, RPC http://blog.csdn.net/mr_smile2014/article/details/47452281 消息队列是在消息的传输过程中保存消息的容器,消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。 一、 产生背景: 现今,越来越多的企业面临着各种各样的数据集成和系统整合,CORBA、DCOM、RMI 等RPC中间件技术也应运而生,但由于采用RPC同步处理技术,在性能、健壮性、可扩展性上都存在着诸多缺点。而基于消息的异步处理模型采用非阻塞的调用特性,发送者将消息发送给消息服务器,消息服务器在合适的时候再将消息转发给接收者;发送和接收是异步的,发送者无需等待,二者的生命周期也可以不必相同,而且发送者可以将消息间接传给多个接收者,大大提高了程序的性能、可扩展性及健壮性,这使得异步处理模型在分布式应用上比起同步处理模型更具有吸引力。 分布式对象调用,如CORBA,RMI 和DCOM,提供了一种通讯机制,透明地在异构的分布式计算环境中传递对象请求,这些对象可以位于本地或远程机器。它通过在对象与对象之间提供一种统一的接口,使对象之间的调用和数据共享不再关心对象的位置、实现语言及所驻留的操作系统。这个接口就是面向对象的中间件。 二、传统面向对象中间件的局限性 同步通信: 客户发出调用后,必须等待服务对象完成处理并返回结果后才能继续执行。 客户和服务对象的生命周期紧密耦合: 客户进程和服务对象进程都必须正常运行,如果由于服务对象崩溃或网络故障导致客户的请求不可达,客户会接收到异常。 三、面向消息的中间件的优越性 消息中间件作为一个中间层软件, 它为分布式系统中创建、发送、接收消息提供了一套可靠通用的方法,实现了分布式系统中可靠的、高效的、实时的跨平台数据传输。消息中间件减少了开发跨平台和网络协议软件的复杂性,它屏蔽了不同操作系统和网络协议的具体细节,面对规模和复杂度都越来越高的分布式系统。它与传统的面向对象中间件相比具有如下优点: 采用异步通信模式: 发送消息者可以在发送消息后进行其它的工作,不用等待接收者的回应,而接收者也不必在接到消息后立即对发送者的请求进行处理。 客户和服务对象生命周期的松耦合关系: 客户进程和服务对象进程不要求都正常运行,如果由于服务对象崩溃或者网络故障导致客户的请求不可达,客户不会接收到异常,消息中间件能保证消息不会丢失。 四、消息中间件的技术标准 消息中间件主要有JMS和AMQP两种技术标准; JMS Java关于消息服务的标准是JMS,JMS即Java消息服务 (Java Message Service) 应用程序接口是一个Java平台中关于面向消息中间件 (MOM) 的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,类似于JDBC,需要不同的提供商进行各自的实现。实现JMS标准的软件可以作为Java下的消息中间件服务器。JMS 使您能够通过消息收发服务 (有时称为消息中介程序或路由器) 从一个 JMS 客户机向另一个 JMS客户机发送消息。消息是 JMS中的一种类型对象,由两部分组成: 报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带: 简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。 AMQP AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。

2012-11-15 · 1 min · 56 words · -

ArrayBlockingQueue

ArrayBlockingQueue ArrayBlockingQueue 是一个基于数组的阻塞队列实现,此队列按 FIFO (先进先出) 原则对元素进行排序, 在 ArrayBlockingQueue 内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。 ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于 LinkedBlockingQueue, 按照实现原理来分析 ArrayBlockingQueue 完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea 之所以没这样去做,也许是因为 ArrayBlockingQueue 的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue 和 LinkedBlockingQueue 间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的 Node 对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于 GC 的影响还是存在一定的区别。而在创建 ArrayBlockingQueue 时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。 队列的头部是在队列中存在时间最长的元素。队列的尾部是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列检索操作则是从队列头部开始获得元素。 这是一个典型的 “有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致放入操作受阻塞;试图从空队列中检索元素将导致类似阻塞。 此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略。默认情况下,不保证是这种排序。然而,通过将公平性 (fairness) 设置为 true 而构造的队列允许按照 FIFO 顺序访问线程。公平性通常会降低吞吐量,但也减少了可变性和避免了"不平衡性"。 ArrayBlockingQueue 在构造时需要指定容量,并可以选择是否需要公平性,如果公平参数被设置true,等待时间最长的线程会优先得到处理 (其实就是通过将 ReentrantLock 设置为 true 来达到这种公平性的: 即等待时间最长的线程会先操作) 。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队列,此队列按 FIFO (先进先出) 原则对元素进行排序。 PriorityBlockingQueue PriorityBlockingQueue 是一个带优先级的队列,而不是先进先出队列。元素按优先级顺序被移除,该队列也没有上限 (看了一下源码,PriorityBlockingQueue 是对 PriorityQueue 的再次包装,是基于堆数据结构的,而 PriorityQueue 是没有容量限制的,与ArrayList一样,所以在优先阻塞队列上put时是不会受阻的。虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError) ,但是如果队列为空,那么取元素的操作take就会阻塞,所以它的检索操作take是受阻的。另外,往入该队列中的元素要具有比较能力。 最后,DelayQueue (基于PriorityQueue来实现的) 是一个存放Delayed 元素的无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满,poll就以移除这个元素了。此队列不允许使用 null 元素。 下面是延迟接口: ...

2012-08-26 · 2 min · 366 words · -