ThreadPoolExecutor란?

ThreadPoolExecutor란 ExecutorService의 구현체 중 하나로, 제출된 작업들을 수행하기 위해 미리 ThreadPool을 만들어 두고, 거기에서 스레드를 가져다 위임한다.
다양한 파라미터를 통해 편하게 설정할 수 있는데,

  1. 기본 스레드 갯수와 최대 스레드 수
  2. 스레드 Factory
  3. 작업들이 대기하는 Blocking Queue
  4. Idle 상태인 스레드를 지우는 시간
  5. Reject된 작업들에 대한 핸들러

위처럼 다양한 요소를 직접 설정할 수 있다.

image

스래드 갯수는 위 흐름도 대로 관리된다. 어떤 상황에서 스레드가 늘어나고 줄어드는지, 또 작업들을 위한 Blocking Queue는 어떻게 사용되는지 알아보자.

1. Thread 갯수는 어떻게 관리되는가?

스레드 갯수는 아래 2개의 파라미터에 의해 결정된다.

  1. corePoolSize
  2. maximumPoolSize

(이 값들은 생성시 파라미터로 넣어 줄 수도 있고, set method가 제공되어서 동적으로 조절 가능하다.)

MaximumPoolSize라는 파라미터가 있으면, 나머지 하나가 Minimum Pool Size일거 같은데 왜 "Core" Pool Size일까?
Core라고 부르는데엔 이유가 있다. ThreadPoolExectorService는 생성시 corePoolSize 갯수만큼 스레드를 만들어 내지 않는다. (이름을 보면 뭔가 기본적으로 갖고 있을 것만 같다.)

새로운 작업이 제출 되었을 때, 만약 실행중인 스레드의 갯수가 corePoolSize 미만이라면, 그때 corePoolSize가 될 때까지 스레드를 만들어 내서 작업을 할당한다! (스레드 팩토리가 이러한 스레드를 만들어낸다.)
굳이 객체가 만들어질 때 미리 core 갯수만큼 스레드를 만들지 않고, 필요할 때 만들어 낸다! 반대로 객체가 만들어질 때 미리 corePoolSize 갯수만큼 준비했으면 좋을 것 같은 상황도 있을 것인데, prestartCoreThread(), prestartAllCoreThread()메서드를 통해 미리 스레드를 준비할 수도 있다.

image

이 상황이 위 그림의 빨간 박스에 나타나있다. execute 실행시 스레드 갯수가 corePoolSize 미만인 경우 addWorker()가 호출된다.



CorePoolSize에 작업이 모두 할당된 다음 추가 요청이 들어오면, MaxSize까지 스레드를 만들어 낼 것만 같지만, 또 그렇지 않다.
CorePoolSize를 초과하는 경우 일단 Blocking Queue에 작업을 추가하고, 이 Queue가 가득차게 되는 경우에만 새로운 스레드를 생성해낸다! 그 상한 값이 maximumPoolSize가 되는 것이다.
그러니까, 일단은 Core Size로 계속해서 버티면서, Blocking Queue에 작업을 밀어 넣다가, Queue까지 가득차게 되면 그때 스레드를 만들어 내는 것이다! 스레드를 미리 생성하지 않고 최대한 미루고 미룬다! 아주 스마트 하다.

image

위 상황이 빨간 박스친 부분인데, 큐가 가득차지 않은 경우 스레드를 늘리지 않고, 큐에 작업만 추가한다.

image

그리고 스레드가 maximumPoolSize 이상 실행중이고, 작업도 모두 할당되고, 큐도 꽉찼다면, 이후 요청은 거절된다! 이때 Reject Handler로 거절시 동작을 결정할 수 있는데, 사용법은 2. 거절된 작업들은 어떻게 처리될까에서 확인하자.

1.1 Blocking Queue의 사이즈는 어떻게 조절하는가?

우리는 "요청이 적은 평소 상황"과 "요청이 몰리는 상황"에 쓰일 스레드 갯수 corePoolSizemaximumPoolSize로 지정할 수 있었다.
그렇다면, 결국 Max Pool Size 갯수만큼의 스레드가 가동되는 것은 Queue가 꽉찬 이후이므로, Application이 "현재 상황이 요청이 몰리는 중인가 그냥 평소 상황인가"를 판단할 때, Queue Capacity가 기준이 될 것이다.
문제는, ThreadPoolExecutor를 생성할 때 명시적인 Queue Capacity 지정이 없는데, Queue를 생성할 때 지정되기 때문이다.
문제는 Queue 구현체마다 Capacity를 설정할 수 있는 것도 있고, 없는 것도 있어서 이 사실을 모르고 막 사용한다면 영원히 corePoolSize 갯수의 스레드만 사용되거나 반대로 걸핏하면 maximumPoolSize 갯수만큼 스레드가 생성될 수 있다.
왜냐하면 Executors가 static factory method로 제공해주는 ThreadPoolExecutor의 구현체들이 사용하는 Queue 중에는 Capacity가 0인 것도 있고, 무한인 것도 있기 때문이다!

간단하게 Blocking Queue의 종류와 구현체들을 알아보자. 그리고 해당 구현체들을 사용하는 Exectors의 메서드를 알아보자.

1.1.1 LinkedBlockingQueue

LinkedBlockingQueue는 무제한의 Capacity를 가진 Blocking Queue이다.
크기 제한이 무제한이기 때문에, 이 큐를 사용하는 ThreadPoolExecutor의 스레드는 갯수는 항상 corePoolSize 만큼만 생성된다! maximumPoolSize 설정은 사실상 무의미한 것이다!
왜냐하면 Queue가 꽉차기 전까지는 스레드 갯수를 corePoolSize만큼만 유지하고, 꽉찬 이후에 maximumPoolSize까지 스레드가 만들어지는 방식이라고 설명했었는데, 이 큐가 꽉찰 일이 없기 때문이다.
그래서 해당 큐를 사용할 때는 항상 corePoolSize 갯수가 스레드의 최대 갯수임을 인지하고 있어야 한다.
또한 대기열 무한정 증가에 주의해야 한다! 만약 작업을 처리하는 쪽에 문제가 생겨 큐에서 작업을 Consume하지 못하는 상황이 발생한다면 Queuee에 작업이 무한히 쌓여 크기가 무한히 커질 수 있다.

정적 메서드 Executors.newFixedThreadPool()과, Executors.newSingleThreadExecutor()를 통해 생성되는 ThreadPoolExecutor가 LinkedBlockingQueue를 사용한다!

image


Executors.newSingleThreadExecutor()인데, 이름값 대로 하나의 스레드만을 사용하기 위해 coreSize 값을 1로 두고 있다. Queue의 크기는 무한하므로 계속 1개의 스레드만 유지될 것이다. maximum이 적혀있으나 무의미하다.

image


Executors.newFixedThreadPool(int nThread)는 입력 받은 nThread 값을 corePoolSize와 maximumPoolSize 값으로 설정한다.

큐의 조작은 우리가 잘 아는 기본적인 Queue 인터페이스가 제공해주는 메서드들로 조작할 수 있다. add, offer, remove, poll은 기존 Queue에서와 똑같이 동작한다. 다만 차이라면 이 4 메서드는 Block되지 않고, Block되는 메서드들도 있다. puttake 메서드는 호출시 블록되어 대기한다. 이 메서드들을 작업을 동기적으로 조절하는 데에 사용할 수 있다. 물론 무한 대기 상황에서는 제한해야 한다. (타임아웃 제공은 X)

1.1.2 SynchronousQueue

SynchronousQueue는 크키가 0인 큐로, 크기가 0이기 때문에 작업이 제출될 때마다 매번 maximumPoolSize 까지 스레드가 생성된다!
아니.. 큐의 사이즈가 0이면 그게 "자료 구조"인가? 했는데, 용도가 다 있다.
SynchronousQueue는 작업 제출시 다른 스레드가 해당 작업을 소비할 때까지 제출한 스레드가 Blocking된다. 혹은 작업을 제출한 스레드가 없는데, 어떤 스레드가 소비하려 한다면, 해당 스레드는 Blocking된다.
이렇게 Blocking되어 반대쪽에 다른 스레드가 나타나기를 기다리는 스레드들은 여러개일 수 있고, "공정성(fair)"이라는 개념에 의해 SynchronousQueue 내부의 자료구조인 Transfer Stack이나 Transfer Queue에 의해 대기된다. 공정성에 대해 간단하게만 설명하자면, 공정성 - fair가 true인 경우 Transfer Queue에 의해 작업들이 대기된다. 먼저 줄 선 사람이 먼저 작업할 수 있으니, 공정하지 않은가? 반대로 fair 값이 false인 경우엔 TransferStack에 의해 관리되는데, 늦게 들어온 스레드가 더 짧은 대기 시간을 가지니 불공평하지 않은가?
이러한 자료구조들에 의해 작업을 제출한 Thread와 소비하려는 Thread는 대기하고 하나씩 수행하게 된다. SynchronousQueue는 중재자 역활만을 할 뿐이다. (fair 값은 기본적으로 false이다.)

이러한 대기로 인해 작업은 자연스럽게 작업이 "동기적으로" 수행되도록 만든다. 순서를 지켜가면서 하나씩 수행될 수 밖에 없기 떄문이다. 이래서 이름이 "SynchronousQueue"이다.
정적 팩터리 메서드 Executors.newCachedThreadPool()를 통해 ThreadPoolExecutor를 생성하면, 내부적으로 이 큐를 사용한다.
이름만 보면 단순히 동기적으로 동작하겠네~ 싶지만, 사이즈가 0인 것을 인지하고 maximumPoolSize 설정에 신경 써야 한다.

image


보면 알겠지만 애초에 core 갯수가 0개이고, max가 Integer MAX_VALUE이다! 대놓고 요청이 들어오는 만큼 계속해서 스레드를 찍어 내겠다는 뜻이다!
newCachedThreadPool라는 메서드 이름만 보면 이런 동작이 예상가지 않을 수 있기 때문에 주의해야 한다.
그럼 왜 메서드 이름이 newCachedThreadPool인가? 작업 갯수만큼 스레드는 만들어지고, 작업을 마치고 나면 KeepAliveTime 시간동안 살아있게 된다. 결국 corePoolSize가 0이므로 언젠간 사라지겠지만, 살아 있는 동안은 이들을 다시 활용할 수 있으므로 "Cached"인것 같다.

1.1.3 ArrayBlockingQueue

ArrayBlockingQueue는 내부적으로 고정 크기의 배열을 사용한다! 드디어 사이즈 조절이 가능한 큐가 등장했다.
큐 사이즈를 의도적으로 조절하고 싶을 때 쓸 수 있겠다!
Queue 사이즈는 어떻게 설정하면 좋을까?
예상되는 "요청이 몰릴 때"와 "평소"를 통해 PoolSize를 조절했을 것이다. 이제, 처리량과 CPU 사용률, 그리고 거부되는 작업이 있어도 되는지, 절대 없어야 하는지 등을 고려하며 Queue 사이즈와 Pool의 사이즈를 조절하면 된다.

  • 작은 Queue Capacity, 큰 Pool Size -> CPU 사용률이 높아지지만, Queue가 작아 거부되는 작업이 있을 수도 있으므로, 처리량이 감소할 수 있다.
    어느 정도 작업이 실패해도 상관 없고, CPU 사용률이 더 중요하다면 큐 사이즈를 작게 설정
  • 큰 Queue Capacity, 작은 Pool Size -> CPU, OS 리소스 사용량 Down, ContextSwitching 오버해드 Down, 그러나 낮은 처리량을 유발할 것이다.
    최대한 core 갯수만큼의 스레드만 사용해 처리할 의도가 있다면 큐 사이즈를 풀 사이즈 보다 크게 설정

1.1.4 BlockingQueue들 주의점 정리

결국 정적 팩터리 메서드로 만들어내는 Executor들이 어떤 Blocking Queue를 가지고 있고, 큐에 따라 스레드 갯수가 다르게 관리될 수 있다는 점을 알아야 한다.

  1. LinkedBlockingQueue
    • Queue 사이즈가 무한이다.
    • 따라서, 항상 corePoolSize 갯수의 스레드만 운용된다. (maximumPoolSize 값이 무의미)
    • Queue가 무한하기 때문에 작업이 무한히 쌓일 수 있다. (소비하는 쪽에서 문제가 생기는 경우)
    • Executors.newFixedThreadPool()와, Executors.newSingleThreadExecutor()에 의해 생성될 수 있음.
  2. SynchronousQueue
    • 큐 Capacity가 0이기 때문에, 스레드가 무한정 만들어질 수 있음에 주의한다.
    • Executors.newCachedThreadPool()에 의해 생성될 수 있음.
  3. ArrayBlockingQueue
    • 크기를 직접 조절할 수 있는 큐이다. Pool Size와 Queue Capacity간의 관계를 이해하고, 상황에 맞게 적절한 값을 사용해야 한다.

1.2 요청이 몰리는 순간이 끝나면, 그 많던 스레드들은 어떻게 될까?

만약 max pool size 갯수만큼 스레드가 만들어진 상태에서, 요청이 몰리는 시간대가 끝나고 다시 서비스가 한산해지면 어떻게 될까? 이제 그렇게 많은 스레드는 필요 없는데, 스레드를 잔뜩 만들어 두었다면 낭비일 수 밖에 없다.
이때 KeepAliveTime이 쓰인다. keepAliveTime 옵션을 통해 간단하게 유휴 상태 스레드를 관리할 수 있다!
ThreadPoolExecutor는 똑똑하게 스레드 갯수가 corePoolSize값 보다 많을 때, keepAliveTime 보다 오랜 시간동안 동안 유휴 상태인 스레드가 있다면, 해당 스레드를 종료 시킨다. 단, corePoolSize 갯수 만큼은 남겨두고 제거한다.

만약 어떤 서비스가 특정 날짜나 시간대에만 요청이 몰리고 평소엔, 별로 많지 않다고 생각해보자. 그때 이 옵션들을 잘 활용할 수 있을 것이다.

  • 평소에는 몇 개의 스레드를 해당 기능을 위해 준비 할건지 -> corePoolSize
  • 요청이 몰릴 때는 최대 몇 개의 스레드가 처리할 것인지 -> maximumPoolSize
  • 물론 큐에 따라

스레드 갯수가 Core Size 갯수와 같아도 오랜 시간 유휴 상태인 스레드를 종료 시키고 싶은 경우 allowCoreThreadTimeOut()을 호출하라. 그러면 coreThread에도 Timeout이 적용된다.
옵션 중 Executor의 정적 팩터리 메서드 newCachedThreadPool()를 통해 생성한다면, keepAliveTime이 60초로 설정되고, 다른 정적 메서드로 생성되는 경우 보통 제한 시간이 없으므로 신경 쓰자. (0으로 설정 되어 있는데, 이것이 제한 시간이 없는 것)

1.2.1 keepAliveTime의 기본 설정 값은 어떻게 되는가?

keepAliveTime의 기본 설정 값은 없다. 생성시 꼭 값을 대입해 줘야 한다.
다만, Executors의 정적 팩터리 메서드를 통해 생성하는 경우 자동으로 값이 할당될 수 있으므로 주의해야 한다.
newCachedThreadPool, newSingleThreadExecutor를 통해 생성하는 경우엔 자동으로 값이 설정된다.
newCachedThreadPool를 사용하는 경우, corePoolSize가 0, maximum Size가 Integer.MAX_VALUE로 설정되는데, 사실상 제한 없이 스레드를 계속 만들겠다는 것.

newSingleThreadExecutor의 경우 core, maximum이 이름 그대로 1개이다.

2. 거절당한 작업들은 어떻게 처리될까?

큐까지 꽉차게 되면 새로운 요청을 처리하기 위해 최대 스레드 갯수만큼의 스레드가 만들어질 수 있다.
최대 갯수만큼의 스레드가 만들어 졌고, 모두 작업이 할당됐으며, 큐까지 꽉차게 된다면 이후 요청은 거절된다! 이때, 거절된 작업들은 어떻게 처리될까?

image

위 메서드는 ThreadPoolExecutor에 작업을 제출하는 execute 메서드이다.
corePoolSize만큼 스레드가 이미 있을 워커 스레드를 더는 늘릴 수 없는 경우 reject가 호출된다.
image

reject는 RejectedExecutionHandler의 rejectedExecution()를 호출한다.
이는 RejectedExecutionHandler 인터페이스가 제공하는 메서드로, 정책마다 다른 구현을 가진다.
제공되는 구현으로는 4가지 정책이 있다. 동작을 살펴보자.

  1. AbortPolicy
  2. CallerRunsPolicy
  3. DiscardPolicy
  4. DiscardOlestPolicy

2.1 AbortPolicy 정책

ThreadPoolExecutor.AbortPolicy 정책은 기본 정책이며, 더 이상 작업을 받을 수 없는 경우 예외를 발생시킨다.

image

위 그림의 빨간 네모칸을 보면, 간단한 실행 정보와 함께 예외를 발생시키는 것을 확인할 수 있다.

2.2 CallerRunsPolicy 정책

ThreadPoolExecutor.CallerRunsPolicy는 이름 그대로 Caller가 Run하는 정책이다. 즉, Executor가 종료 되지 않았더라면, execute를 호출한 스레드가 직접 요청을 수행한다!

image

executor의 상태를 확인한 다음, run을 호출해서 직접 작업을 수행하는 모습을 확인할 수 있다.

2.3 DiscardPolicy 정책

image

깔끔하고 귀엽다. ThreadPoolExecutor.DiscardPolicy는 작업을 버리는 정책이다. 메서드 블럭에 코드가 단 한 글자도 없다. 구현이 깔끔하고 클린 코드 그자체이다.

2.4 DiscardOldestPolicy 정책

잔인한 정책이다. ThreadPoolExecutor.DiscardOldestPolicy는 작업 하나를 Queue에서 poll한다. 즉, 가장 오래 대기한 작업을 꺼내어 없애고, execute를 다시 호출한다. (Executor가 종료되지 않은 경우)

image

너무 잔인하다. 오랜 시간을 고되게 기다려온 작업은 수행되지 못하고 poll당하고 새로운 작업을 위해 execute를 수행한다. 그래서 DiscardOldest이다. 가장 오래된 작업이 버려진다.

3. Thread Pool Hook 메서드

ThreadPoolExecutor는 작업 실행 시점에 적용할 수 있는 Hook Method를 제공해준다. execute의 전, 후와 스레드 풀 종료시 실행되는 메서드를 정의할 수 있다. 전후에 로깅을 할 수도 있을테고, 작업 전 이것저것 준비할 수도 있을 것이다.

일단 ThreadPoolExecutor를 확장하는 클래스를 만들어준 다음, 아래 메서드들을 재정의함으로써 쉽게 동작을 정의해줄 수 있다.

  1. beforeExecute(Thread t, Runnalbe r) : 각 execute의 실행 전에 한번씩 호출된다.
  2. afterExecute(Thread t, Throwable r) : 각 execute의 실행 후에 한번씩 호출된다.
    인자의 Throwable은 무엇일까? 예외가 발생한 경우 이 afterExecute()에서 예외를 처리해줄 수 있다. 예외가 발생하지 않았다면 null이 들어있다.
  3. terminated() : 스레드 풀이 완전히 종료된 뒤에 호출된다.
    앞선 두 메서드는 작업마다 호출됐고, 이 메서드는 한번만 호출되니 마무리 작업을 정의할 수 있을 것이다. 뒤에 나올 TIDYING 상태와 TERMINATED 상태의 중간에 호출된다.

4. ThreadPoolExecutor의 생명 주기와 상태

ThreadPoolExecutor는 자체적으로 생명 주기와 상태를 갖고, 상태별로 작업 스레드 풀의 동작이 결정된다.

image

상태들을 설명하기 위해 그림 먼저 보자. 그림에서 빨간색 글씨로 적힌 내용들이 Exector의 상태를 나타낸다.

4.1 상태

  1. RUNNING : 새로운 작업을 수용하고 대기중인 작업을 처리한다.
  2. SHUTDOWN : 스레드 풀을 종료하는 상태 + 현재 대기 중인 작업은 처리하지만, 새 작업을 수용하지 않는다. 위 그림과 같이 현재 대기중인 작업들이 있다면 대기한다.
    shutdown()의 호출로 RUNNING 상태에서 SHUTDOWN 상태로 전환할 수 있다. (왼편의 Main에서 아래로 이어지는 선 참고)
  3. STOP : 스레드 풀을 종료하는 상태 + 현재 진행중인 작업을 중단한다! 새 작업도 수용하지 않는다.
    위 그림과 같이 STOP 상태인 경우 thread.interrupt()를 호출한다!
    SHUTDOWN 상태에서 shutdownNow() 메서드가 호출되면 STOP 상태가 된다.
  4. TIDYING : SHUTDOWN이나 STOP 상태에서 큐와 풀이 모두 비게 되면 이 상태로 접어든다.모든 작업이 종료된 상태이다. worker의 갯수를 저장하는 workerCount가 0이 되면 terminated() 훅 메서드를 실행한다.
  5. TERMINATED : terminated()가 완료된 이후 이 상태가 된다. 이제 스레드 풀이 종료된 상태이다.