java – 向ArrayBlockingQueue添加功能
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了java – 向ArrayBlockingQueue添加功能,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含4602字,纯文字阅读大概需要7分钟。
内容图文
![java – 向ArrayBlockingQueue添加功能](/upload/InfoBanner/zyjiaocheng/727/9b9e8ec70dfd486989861f4f14e65278.jpg)
我正在尝试向ArrayBlockingQueue添加功能,特别是我希望队列只保留唯一的元素,即如果条目已经包含在队列中,则不会将条目排入队列.
由于我想要的功能与JCIP第4.4项中Vector的扩展相同,我尝试使用那里的方法实现它.
>通过扩展实现不起作用,因为ArrayBlockingQueue使用包私有ReentrantLock实现其互斥,因此作为扩展类我无法获得对它的引用.即使它确实有效,这也是一种脆弱的方法.
>客户端锁定的实现不起作用,因为没有客户端侧锁定支持.
>组合实现似乎是最初的方式,生成代码,如
public class DistinctBlockingQueue<E> implements BlockingQueue<E> {
private final BlockingQueue<E> backingQueue;
public DistinctBlockingQueue(BlockingQueue<E> backingQueue) {
this.backingQueue = backingQueue;
}
@Override
public synchronized boolean offer(E e) {
if (backingQueue.contains(e)) {
return false;
}
return backingQueue.offer(e);
}
@Override
public synchronized E take() throws InterruptedException {
return backingQueue.take();
}
// Other methods...
}
不幸的是,在编写ArrayBlockingQueue时,这种方法会在以下简单场景中产生死锁:
>线程A调用take()并获取同步锁和ArrayBlockingQueue的内部锁.
>线程在看到队列为空时阻塞并释放ArrayBlockingQueue的内部锁.
>线程B使用元素调用offer()但无法获取同步锁,永远阻塞.
我的问题是,如何在不重写ArrayBlockingQueue的情况下实现此功能?
解决方法:
也许一个简单而快速的解决方案是使用java.util.concurrent.ConcurrentMap:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class DistinctBlockingQueue<E> implements BlockingQueue<E> {
private final BlockingQueue<E> backingQueue;
private final ConcurrentMap<E, Boolean> elements = new ConcurrentHashMap<>();
public DistinctBlockingQueue(BlockingQueue<E> backingQueue) {
this.backingQueue = backingQueue;
}
@Override
public boolean offer(E e) {
boolean[] add = {false};
elements.computeIfAbsent(e, k -> add[0] = true);
return add[0] && backingQueue.offer(e);
}
@Override
public E take() throws InterruptedException {
E e = backingQueue.take();
elements.remove(e);
return e;
}
// Other methods
}
请注意,不需要同步.
编辑:
java.util.concurrent.ConcurrentHashMap的文档说:
/**
* If the specified key is not already associated with a value,
* attempts to compute its value using the given mapping function
* and enters it into this map unless {@code null}. The entire
* method invocation is performed atomically, so the function is
* applied at most once per key. Some attempted update operations
* on this map by other threads may be blocked while computation
* is in progress, so the computation should be short and simple,
* and must not attempt to update any other mappings of this map.
*
* @param key key with which the specified value is to be associated
* @param mappingFunction the function to compute a value
* @return the current (existing or computed) value associated with
* the specified key, or null if the computed value is null
* @throws NullPointerException if the specified key or mappingFunction
* is null
* @throws IllegalStateException if the computation detectably
* attempts a recursive update to this map that would
* otherwise never complete
* @throws RuntimeException or Error if the mappingFunction does so,
* in which case the mapping is left unestablished
*/
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
...
}
我添加了一些额外的检查:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class DistinctBlockingQueue<E> implements BlockingQueue<E> {
private final BlockingQueue<E> backingQueue;
private final ConcurrentMap<E, Boolean> elements = new ConcurrentHashMap<>();
public DistinctBlockingQueue(BlockingQueue<E> backingQueue) {
this.backingQueue = backingQueue;
}
@Override
public boolean offer(E e) {
boolean[] add = {false};
elements.computeIfAbsent(e, k -> add[0] = true);
if (add[0]) {
// make sure that the element was added to the queue,
// otherwise we must remove it from the map
if (backingQueue.offer(e)) {
return true;
}
elements.remove(e);
}
return false;
}
@Override
public E take() throws InterruptedException {
E e = backingQueue.take();
elements.remove(e);
return e;
}
@Override
public String toString() {
return backingQueue.toString();
}
// Other methods
}
并且…让我们做一些并发测试:
BlockingQueue<String> queue = new DistinctBlockingQueue<>(new ArrayBlockingQueue<>(100));
int n = 1000;
ExecutorService producerService = Executors.newFixedThreadPool(n);
Callable<Void> producer = () -> {
queue.offer("a");
return null;
};
producerService.invokeAll(IntStream.range(0, n).mapToObj(i -> producer).collect(Collectors.toList()));
producerService.shutdown();
System.out.println(queue); // prints [a]
内容总结
以上是互联网集市为您收集整理的java – 向ArrayBlockingQueue添加功能全部内容,希望文章能够帮你解决java – 向ArrayBlockingQueue添加功能所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。