博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
深入理解java中的底层阻塞原理及实现
阅读量:6681 次
发布时间:2019-06-25

本文共 13296 字,大约阅读时间需要 44 分钟。

  谈到阻塞,相信大家都不会陌生了。阻塞的应用场景真的多得不要不要的,比如 生产-消费模式,限流统计等等。什么 ArrayBlockingQueue, LinkedBlockingQueue, DelayQueue...  都是阻塞队列的实现啊,多简单!

  阻塞,一般有两个特性很亮眼:1. 不耗cpu的等待;2. 线程安全;

  额,要这么说也ok的。毕竟,我们遇到的问题,到这里就够解决了。但是有没有想过,这容器的阻塞又是如何实现的呢?

好吧,翻开源码,也很简单了:(比如ArrayBlockingQueue的take、put....)

// ArrayBlockingQueue    /**     * Inserts the specified element at the tail of this queue, waiting     * for space to become available if the queue is full.     *     * @throws InterruptedException {
@inheritDoc} * @throws NullPointerException {
@inheritDoc} */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) // 阻塞的点 notFull.await(); enqueue(e); } finally { lock.unlock(); } } /** * Inserts the specified element at the tail of this queue, waiting * up to the specified wait time for space to become available if * the queue is full. * * @throws InterruptedException {
@inheritDoc} * @throws NullPointerException {
@inheritDoc} */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; // 阻塞的点 nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 阻塞的点 notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }

看来,最终都是依赖了AbstractQueuedSynchronizer类(著名的AQS)的await方法,看起来像那么回事。那么这个同步器的阻塞又是如何实现的呢?

java的代码总是好跟踪的:

// AbstractQueuedSynchronizer.await()

/**         * Implements interruptible condition wait.         * 
    *
  1. If current thread is interrupted, throw InterruptedException. *
  2. Save lock state returned by {
    @link #getState}. *
  3. Invoke {
    @link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. *
  4. Block until signalled or interrupted. *
  5. Reacquire by invoking specialized version of * {
    @link #acquire} with saved state as argument. *
  6. If interrupted while blocked in step 4, throw InterruptedException. *
*/ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { // 此处进行真正的阻塞 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }

如上,可以看到,真正的阻塞工作又转交给了另一个工具类: LockSupport的 park 方法了,这回跟锁扯上了关系,看起来已经越来越接近事实了:

// LockSupport.park()

/**     * Disables the current thread for thread scheduling purposes unless the     * permit is available.     *     * 

If the permit is available then it is consumed and the call returns * immediately; otherwise * the current thread becomes disabled for thread scheduling * purposes and lies dormant until one of three things happens: * *

    *
  • Some other thread invokes {
    @link #unpark unpark} with the * current thread as the target; or * *
  • Some other thread {
    @linkplain Thread#interrupt interrupts} * the current thread; or * *
  • The call spuriously (that is, for no reason) returns. *
* *

This method does not report which of these caused the * method to return. Callers should re-check the conditions which caused * the thread to park in the first place. Callers may also determine, * for example, the interrupt status of the thread upon return. * * @param blocker the synchronization object responsible for this * thread parking * @since 1.6 */ public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); setBlocker(t, null); }

看得出来,这里的实现就比较简洁了,先获取当前线程,设置阻塞对象,阻塞,然后解除阻塞。

好吧,到底什么是真正的阻塞,我们还是不得而知!

UNSAFE.park(false, 0L); 是个什么东西? 看起来就是这一句起到了最关键的作用呢!但由于这里已经是 native代码,我们已经无法再简单的查看源码了!那咋整呢?

 那不行就看C/C++的源码呗,看一下parker的定义(park.hpp):

class Parker : public os::PlatformParker {private:  volatile int _counter ;  Parker * FreeNext ;  JavaThread * AssociatedWith ; // Current associationpublic:  Parker() : PlatformParker() {    _counter       = 0 ;    FreeNext       = NULL ;    AssociatedWith = NULL ;  }protected:  ~Parker() { ShouldNotReachHere(); }public:  // For simplicity of interface with Java, all forms of park (indefinite,  // relative, and absolute) are multiplexed into one call.  c中暴露出两个方法给java调用  void park(bool isAbsolute, jlong time);  void unpark();  // Lifecycle operators  static Parker * Allocate (JavaThread * t) ;  static void Release (Parker * e) ;private:  static Parker * volatile FreeList ;  static volatile int ListLock ;};

 

那 park() 方法到底是如何实现的呢? 其实是继承的 os::PlatformParker 的功能,也就是平台相关的私有实现,以 linux 平台实现为例(os_linux.hpp):

// linux中的parker定义class PlatformParker : public CHeapObj
{ protected: enum { REL_INDEX = 0, ABS_INDEX = 1 }; int _cur_index; // which cond is in use: -1, 0, 1 pthread_mutex_t _mutex [1] ; pthread_cond_t _cond [2] ; // one for relative times and one for abs. public: // TODO-FIXME: make dtor private ~PlatformParker() { guarantee (0, "invariant") ; } public: PlatformParker() { int status; status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr()); assert_status(status == 0, status, "cond_init rel"); status = pthread_cond_init (&_cond[ABS_INDEX], NULL); assert_status(status == 0, status, "cond_init abs"); status = pthread_mutex_init (_mutex, NULL); assert_status(status == 0, status, "mutex_init"); _cur_index = -1; // mark as unused }};

 

 看到 park.cpp 中没有重写 park() 和 unpark() 方法,也就是说阻塞实现完全交由特定平台代码处理了(os_linux.cpp):

// park方法的实现,依赖于 _counter, _mutex[1], _cond[2]void Parker::park(bool isAbsolute, jlong time) {  // Ideally we'd do something useful while spinning, such  // as calling unpackTime().  // Optional fast-path check:  // Return immediately if a permit is available.  // We depend on Atomic::xchg() having full barrier semantics  // since we are doing a lock-free update to _counter.  if (Atomic::xchg(0, &_counter) > 0) return;  Thread* thread = Thread::current();  assert(thread->is_Java_thread(), "Must be JavaThread");  JavaThread *jt = (JavaThread *)thread;  // Optional optimization -- avoid state transitions if there's an interrupt pending.  // Check interrupt before trying to wait  if (Thread::is_interrupted(thread, false)) {    return;  }  // Next, demultiplex/decode time arguments  timespec absTime;  if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all    return;  }  if (time > 0) {    unpackTime(&absTime, isAbsolute, time);  }  // Enter safepoint region  // Beware of deadlocks such as 6317397.  // The per-thread Parker:: mutex is a classic leaf-lock.  // In particular a thread must never block on the Threads_lock while  // holding the Parker:: mutex.  If safepoints are pending both the  // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.  ThreadBlockInVM tbivm(jt);  // Don't wait if cannot get lock since interference arises from  // unblocking.  Also. check interrupt before trying wait  if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {    return;  }  int status ;  if (_counter > 0)  { // no wait needed    _counter = 0;    status = pthread_mutex_unlock(_mutex);    assert (status == 0, "invariant") ;    // Paranoia to ensure our locked and lock-free paths interact    // correctly with each other and Java-level accesses.    OrderAccess::fence();    return;  }#ifdef ASSERT  // Don't catch signals while blocked; let the running threads have the signals.  // (This allows a debugger to break into the running thread.)  sigset_t oldsigs;  sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();  pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);#endif  OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);  jt->set_suspend_equivalent();  // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()  assert(_cur_index == -1, "invariant");  if (time == 0) {    _cur_index = REL_INDEX; // arbitrary choice when not timed    status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;  } else {    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;    status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;    if (status != 0 && WorkAroundNPTLTimedWaitHang) {      pthread_cond_destroy (&_cond[_cur_index]) ;      pthread_cond_init    (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());    }  }  _cur_index = -1;  assert_status(status == 0 || status == EINTR ||                status == ETIME || status == ETIMEDOUT,                status, "cond_timedwait");#ifdef ASSERT  pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);#endif  _counter = 0 ;  status = pthread_mutex_unlock(_mutex) ;  assert_status(status == 0, status, "invariant") ;  // Paranoia to ensure our locked and lock-free paths interact  // correctly with each other and Java-level accesses.  OrderAccess::fence();  // If externally suspended while waiting, re-suspend  if (jt->handle_special_suspend_equivalent_condition()) {    jt->java_suspend_self();  }}// unpark 实现,相对简单些void Parker::unpark() {  int s, status ;  status = pthread_mutex_lock(_mutex);  assert (status == 0, "invariant") ;  s = _counter;  _counter = 1;  if (s < 1) {    // thread might be parked    if (_cur_index != -1) {      // thread is definitely parked      if (WorkAroundNPTLTimedWaitHang) {        status = pthread_cond_signal (&_cond[_cur_index]);        assert (status == 0, "invariant");        status = pthread_mutex_unlock(_mutex);        assert (status == 0, "invariant");      } else {        // must capture correct index before unlocking        int index = _cur_index;        status = pthread_mutex_unlock(_mutex);        assert (status == 0, "invariant");        status = pthread_cond_signal (&_cond[index]);        assert (status == 0, "invariant");      }    } else {      pthread_mutex_unlock(_mutex);      assert (status == 0, "invariant") ;    }  } else {    pthread_mutex_unlock(_mutex);    assert (status == 0, "invariant") ;  }}
从上面代码可以看出,阻塞主要借助于三个变量,_cond, _mutex, _counter, 调用linux系统的 pthread_cond_wait, pthread_mutex_lock, pthread_mutex_unlock (一组POSIX标准的阻塞接口)等平台相关的方法进行阻塞了! 而 park.cpp中,则只有  Allocate、Release 等的一些常规操作!
// 6399321 As a temporary measure we copied & modified the ParkEvent::// allocate() and release() code for use by Parkers.  The Parker:: forms// will eventually be removed as we consolide and shift over to ParkEvents// for both builtin synchronization and JSR166 operations.volatile int Parker::ListLock = 0 ;Parker * volatile Parker::FreeList = NULL ;Parker * Parker::Allocate (JavaThread * t) {  guarantee (t != NULL, "invariant") ;  Parker * p ;  // Start by trying to recycle an existing but unassociated  // Parker from the global free list.  // 8028280: using concurrent free list without memory management can leak  // pretty badly it turns out.  Thread::SpinAcquire(&ListLock, "ParkerFreeListAllocate");  {    p = FreeList;    if (p != NULL) {      FreeList = p->FreeNext;    }  }  Thread::SpinRelease(&ListLock);  if (p != NULL) {    guarantee (p->AssociatedWith == NULL, "invariant") ;  } else {    // Do this the hard way -- materialize a new Parker..    p = new Parker() ;  }  p->AssociatedWith = t ;          // Associate p with t  p->FreeNext       = NULL ;  return p ;}void Parker::Release (Parker * p) {  if (p == NULL) return ;  guarantee (p->AssociatedWith != NULL, "invariant") ;  guarantee (p->FreeNext == NULL      , "invariant") ;  p->AssociatedWith = NULL ;  Thread::SpinAcquire(&ListLock, "ParkerFreeListRelease");  {    p->FreeNext = FreeList;    FreeList = p;  }  Thread::SpinRelease(&ListLock);}

 

  综上源码,在进行阻塞的时候,底层并没有(并不一定)要用while死循环来阻塞,更多的是借助于操作系统的实现来进行阻塞的。当然,这也更符合大家的猜想!

      从上的代码我们也发现一点,底层在做许多事的时候,都不忘考虑线程中断,也就是说,即使在阻塞状态也是可以接收中断信号的,这为上层语言打开了方便之门。

    如果要细说阻塞,其实还远没完,不过再往操作系统层面如何实现,就得再下点功夫,去翻翻资料了,把底线压在操作系统层面,大多数情况下也够用了!

  

转载地址:http://slnao.baihongyu.com/

你可能感兴趣的文章
[洛谷P3709]大爷的字符串题
查看>>
通过映射关系 动态转义为统一格式的数据 (支持 JSON 和 XML )
查看>>
ajax跨域解决方案(服务端仅限java)
查看>>
Shell 文本处理三剑客之grep
查看>>
如何写出让人看了恶心的代码
查看>>
http状态码
查看>>
好记性不如烂笔杆-android学习笔记<十五> GridView简单用法
查看>>
最短路径
查看>>
表格相关技巧(双击启动事件、取得行号、定义表格的读写属性)
查看>>
ubuntu server vsftpd 虚拟用户及目录
查看>>
GCD多线程使用
查看>>
[转载] 格式化字符串漏洞原理介绍
查看>>
python小项目之微信远程控制
查看>>
Mysql本地安装多实例后启动遇到的问题
查看>>
用 RPM 打包软件,第 1 部分
查看>>
POJ题目(转)
查看>>
js使用闭包时,内部函数是直接访问外部函数的实际变量而非复制一份新变量...
查看>>
P3622 [APIO2007]动物园
查看>>
HBase原理和设计
查看>>
map通过value获取对应key
查看>>