스레드의 동기화란?
멀티스레드 프로세스의 경우 여러 스레드가 같은 프로세스 내의 자원을 공유해서 작업하기 때문에 서로의 작업에 영향을 주게 된다. 따라서 한 스레드가 특정 작업을 끝마치기 전까지 다른 스레드에 의해 방해받지 않도록 하는 것이 필요하다. 그래서 도입된 개념이 바로 임계영역과 잠금이다. 공유 데이터를 사용하는 영역을 임계영역으로 지정하고, 공유데이터가 가지고 있는 lock을 획득한 단 하나의 쓰레드만 이 영역 내의 코드를 수행할 수 있게 한다. 이렇게 한 스레드가 진행 중인 작업을 다른 스레드가 간섭하지 못하도록 막는 것이 쓰레드의 동기화, synchronization이라고 한다.
임계 영역은 락을 얻은 단 하나의 쓰레드만 출입이 가능하며, 객체 한개에 하나의 락이 할당된다. 임계 영역이 많을수록 성능이 떨어지는데 한 번에 한 스레드만 접근이 가능하기 때문이다. 임계영역은 멀티스레드 프로그램의 성능을 좌우하기 때문에 가능하면 메서드 전체에 락을 거는 것보다 synchronized블록으로 임계 영역을 최소화해서 보다 효율적인 프로그램이 되도록 노력해야 한다.
임계영역을 지정하는 방법은 두 가지가 있다.
- 메서드 전체를 임계영역으로 지정
public synchronized void calSum() {
// .. 임계 영역(ciritcal section)
}
- 특정한 영역을 임계영역으로 지정
synchronized (객체의 참조 변수) {
}
lock의 획득과 반납은 자동적으로 이루어지므로 개발자는 임계 영역만 설정해 주면 된다.
아래 예제는 synchronized 키워드를 이용해 withdraw 함수를 임계영역으로 설정한 코드다. 만약 임계영역을 설정해주지 않는다면 동시에 여러 개의 스레드가 balance 라는 값에 접근하여 음수가 출력되는 결과가 나올 수 있다. ( 반드시 그런 것은 아니다 ) 이 때 주의할 점은 인스턴스 변수 balance 의 접근제어자가 private 라는 것이다. 만약 private 가 아니라면 아무리 임계설정을 해주더라도 외부에서 접근해서 값을 변경할 수 있기 때문에 동기화 설정과 별개로 값이 변경되는 것을 막을 수 없다. 따라서 공유자원으로 사용할 변수에는 private 선언을 해줘야 한다.
package com.example.thread;
import com.intellij.diagnostic.RunnablesListener;
public class SynchronizedTest {
public static void main(String[] args) {
RunnableA runnableA = new RunnableA();
RunnableA runnableB = new RunnableA();
Thread thread = new Thread(runnableA);
Thread thread_2 = new Thread(runnableB);
thread.start();
thread_2.start();
}
}
class Account {
private int balance = 1000; // private 로 해야 sync한 의미가 있음 ( public 이면 외부에서 참조해서 바꿀 수 있음 . 의미가 없다 )
public synchronized int getBalance() { // 읽을 때도 동기화를 붙여주는게 좋음
return balance;
}
//public void withdraw(int money) { // snychronized 로 메서드를 동기화 -> 한번에 한 쓰레드만 접근 가능
public synchronized void withdraw(int money) { // snychronized 로 메서드를 동기화 -> 한번에 한 쓰레드만 접근 가능
if(balance >= money) {
try {
Thread.sleep(1000); // 결과를 보기 위해 추가
} catch (InterruptedException e) {
System.out.println(" interrupt Exception 발생");
}
balance -= money;
}
}
}
class RunnableA implements Runnable {
Account acc = new Account();
@Override
public void run() {
while(acc.getBalance() > 0) {
//int money = (int)(Math.random() * 3 + 1) * 100;
int money = 100;
acc.withdraw(money); // 출금
System.out.println("balance : " + acc.getBalance() + ", Thread name => " + Thread.currentThread().getName());
System.out.println("===================" + ", Thread name => " + Thread.currentThread().getName());
}
}
}
동기화의 단점을 극복하기 위한 notify()와 wait()
동기화를 설정하면 공유데이터를 보호할 수 있지만, 특정 스레드가 객체의 락을 가진 상태로 오랜 시간을 보낼 경우 프로그램 효율이 떨어진다는 단점을 가진다. 이런 상황을 개선하기 위해 고안된 것이 바로 wait() 와 notify() 이다. 동기화된 임계영역의 코드를 수행하다가 작업을 더 이상 진행할 상황이 아니면 일단 wait()를 호출하여 쓰레드가 락을 반납하고 기다리게 한다. 그러면 다른 쓰레드가 락을 얻어 해당 객체에 대한 작업을 수행할 수 있게 된다. 나중에 작업을 진행할 수 있는 상황이 되면 notify()를 호출해서 작업을 중단했던 쓰레드가 다시 락을 얻어 작업을 진행할 수 있게 한다. 이렇게 wait()에 의해 lock을 반납했다가 다시 lock을 얻어서 임계 영역에 들어오는 것을 재진입이라고 한다. 이때 오래 기다린 스레드가 락을 얻는다는 보장이 없다.
객체에는 일종의 대기실 개념이 있는데 이를 waiting pool라고 표현한다. wait()가 호출되면 실행 중이던 스레드는 해당 객체의 waiting pool 로 이동하여 notify() 가 호출될 때 까지 기다린다. notifyAll()을 호출하면 기다리고 있는 모든 쓰레드에게 통보( waiting pool 에 있는 쓰레드에게 통보되는 것으로 notifyAll()이 호출된 객체의 쓰레드들만 깨운다) 가 되지만 그래도 lock을 얻을 수 있는 것은 하나의 쓰레드뿐이고 나머지 쓰레드는 통보를 받긴 하지만 lock을 얻지 못하면 다시 lock을 기다리는 신세가 된다.
wait()와 notify()는 특정 객체에 대한 것이므로 Object 클래스에 아래와 같이 정의되어 있다.
void wait() // 인자가 없는 경우 notify()가 호출될 때 까지 계속 기다린다
void wait(long timeout) // 인자가 있는 경우 지정된 시간이 지나면 자동으로 notify()가 호출 된다
void wait(long timeout, int nanos)
void notify()
void notifyAll()
일반적으로 notifyAll()이 notify() 보다 선호되는데 이것은 두 가지 이유로 간추릴 수 있다.
첫 번째로, wait()하는 스레드는 제 각각 여러 이유로 대기하지만 동일한 waiting pool에 저장된다. 이때 notify()를 수행하면 조건 변수에 대기하고 있는 스레드 중 하나가 깨어나는데 해당 스레드가 대기에서 빠져나갈 수 있는 조건이 부합하는지 아닌지 알 수 없다. 만약 대기에서 깨어난 쓰레드가 여전히 조건에 부합하지 못한다면 다시 대기하게 된다. 하지만 notifyAll()을 사용할 경우 모든 쓰레드가 대기에서 깨어나기 때문에 애플리케이션이 정상적으로 제 기능을 발휘할 수 있다.
두 번째는 경쟁상태 관련 문제다. notify()는 하나의 스레드만 깨우기 때문에 모든 스레드는 대기에서 빠져나오기 위해 경쟁상태에 있게 되는데, 이것은 어떤 스레드가 깨어날지 모르는 불확실한 상태에 있다는 것을 의미한다. nofiyAll()을 사용하면 모든 스레드가 공정하게 깨어나기 때문에 경쟁상태가 감소된다.
이것은 어디까지나 일반적이지 무조건적인 것은 아니다. 만약 객체별 waiting pool을 따로 관리하고, 어떤 스레드를 깨워야 할지 조건에 따라 notify()를 호출할 수 있다면 notifyAll() 보다 notify()가 좋은 선택이 될 수 있다.
기아 상태와 경쟁 상태를 해결하기 위해 Lock과 Condition을 사용하자 ( 슬프게도 완벽하게 해결할 순 없다 )
waiting pool에서 대기 중인 스레드가 계속 notify()를 받지 못하고 오랫동안 기다리는 것을 기아 현상(starvation)이라고 표현한다. 이 현상을 막기 위해 notifyAll()을 사용할 수 있다. notifyAll()을 사용하면 기아현상을 막을 수는 있지만, 불필요한 스레드까지 통지 알림을 받아 lock을 얻기 위한 경쟁상태(race condition)에 들어가는데 이 경쟁 상태를 개선하기 위해 필요한 스레드를 구별해서 통지하는 것이 필요하다. 이때 Lock과 Condition을 사용하여 wait()¬ify()로는 불가능한 선별적인 통지를 할 수 있다.
java.util.concurrent.locks 패키지에 있는 lock클래스들은 다음과 같다
ReentrantLock // 재진입이 가능한 Lock. 가장 일반적인 배타 lock
ReentrantReadWriteLock // 읽기에는 공유적이고 쓰기에는 배타적인 lock
StampedLock // ReentrantReadWriteLock에 낙관적인 lock의 기능을 추가
- ReentrantLock
가장 일반적인 lock으로, 특정 조건에서는 lock을 풀고 나중에 다시 lock을 얻고 임계영역으로 들어와서 이후의 작업을 수행할 수 있다.
ReentrantLock의 생성자는 다음과 같다.
ReentrantLock()
ReenTrantLock(boolean fair)
생성자의 인자로 true를 주면 lock이 풀렸을 때 가장 오래 기다린 스레드가 lock을 획득할 수 있게 공정하게 처리한다. 단, 성능은 떨어진다. 대부분의 경우 굳이 공정하게 처리하지 않아도 문제가 되지 않으므로 공정함보다는 성능을 선택한다.
synchronized 블록의 경우 자동으로 lock의 잠금과 해제가 관리되지만, ReentrantLock과 같은 lock클래스들은 수동으로 lock을 잠그고 해제해야 한다. 따라서 다음과 같은 함수들이 제공된다.
void lock() // lock 을 잠굼
void unlock() // lock 을 해지함
boolean isLocked() // lock 이 잠겼는지 확인함
임계영역은 lock()과 unlock() 사이에 걸린다.
lock.lock();
// 임계 영역
lock.unlock();
일반적으로 unlock()은 try-finally문을 사용한다. 임계 영역 내에서 예외가 발생하거나 return문을 빠져나가게 되면 lock이 풀리지 않을 수 있기 때문이다.
lock()을 사용하면 lock을 얻을 때까지 스레드를 블락시키므로 스레드의 응답성이 나빠질 수 있다. 이럴 때 tryLock()을 사용하면 지정된 시간 동안 lock을 얻지 못하면 다시 작업을 시도할 것인지 포기할 것인지를 사용자가 결정할 수 있게 코드를 작성할 수 있다.
boolean tryLock()
boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException // interrupted을 제공하므로 interrupt()에 의해 작업이 취소될 수 있도록 할 수 있음
- ReentrantReadWriteLock
읽기를 위한 lock과 쓰기를 위한 lock을 제공한다. ReentrantReadWriteLock은 읽기 lock이 걸려 있으면 다른 스레드가 읽기 lock을 중복해서 걸고 읽기를 수행할 수 있다. 읽기는 내용을 변경하지 않으므로 동시에 여러 쓰레드가 접근해도 문제 되지 않는다. 그러나 읽기 lock이 걸린 상태에서 쓰기 lock을 거는 것은 허용되지 않는다.
- StampedLock
lock을 걸거나 해지할 때 스탬프라는 정수값을 사용하며 읽기와 쓰기를 위한 Lock 외에 'optimistic reading lock(낙관적 읽기)'가 추가된 것이다. 읽기 lock이 걸려있으면 쓰기 lock을 얻기 위해서는 읽기 lock이 풀릴 때까지 기다려야 하는데 비해 optimistic reading lock은 쓰기 lock에 의해 바로 풀린다. 그래서 낙관적 읽기에 실패하면 읽기 lock을 얻어서 다시 읽어 와야 한다. 무조건 읽기 lock을 걸지 않고, 쓰기와 읽기가 충돌할 때만 쓰기가 끝난 후에 읽기 lock을 거는 것이다.
int getBalance() {
long stamp = lock.tryOptimisticRead(); // 낙관적 읽기 lock을 걸음
int curBalance = this.balance; // 공유데이터인 balance 를 가져옴
if(lock.validate(stamp)) { // 쓰기 lock에 의해 낙관적읽기lock이 풀렸는지 확인
stamp = lock.readLock(); // lock이 풀렸으면 읽기 lock을 얻으려고 기다림
try {
curBalance = this.balance; // 공유 데이터를 다시 읽어옴
} finally {
lock.unlockedRead(stamp); // 읽기 lock을 품
}
}
return curBalance; // 낙관적 읽기 lock이 풀리지 않으면 곧바로 읽어온 값을 반환
}
- Condition
Condition은 이미 생성된 lock으로부터 newCondition()을 호출해서 생성한다.
private ReentrantLock lock = new ReentrantLock();// lock을 생성
//lock으로 condition을 생성
private Condition forCook = lock.newCondition();
private Condition forCust = lock.newCondition();
wait()와 notify()의 경우 동일한 waiting pool을 공유하여 지정된 스레드로 notify() 지정이 불가능했으나, condition을 사용하면 스레드별로 지정하여 wait()와 notify()가 가능하다. Condition에서 제공하는 await()와 signal()을 사용하면 된다.
Object | Condition |
void wait() | void await() void awaitUninterruptibly() |
void wait(long timeout) | boolean await(long time, TimeUnit unit) long awaitNanos(long nanosTimeout) boolean awaitUntil(Date deadline) |
void notify() | void signal() |
void notifyAll() | void signalAll() |
작업을 원자화하는 키워드 Volatile
멀티 코어 프로세서에서는, 코어마다 별도의 캐시를 가지고 동작한다. 코어는 메모리에서 읽어온 값을 캐시에 저장하고, 캐시에서 값을 읽어서 작업한다. 다시 같은 값을 읽어올 때는 먼저 캐시에 있는지 확인하고 없을 때만 메모리에서 읽어 온다. 그러다 보니 도중에 메모리에 저장된 변수의 값이 변경되었는데도 캐시에 저장된 값이 경신되지 않아서 메모리에 저장된 값과 캐시의 값이 다른 경우가 발생한다.
이럴 때 변수 앞에 volatile을 붙이면 캐시가 아니라 메모리에서 값을 읽어오기 때문에 캐시와 메모리 간의 값의 불일치를 해결할 수 있다.
변수에 volatile을 붙이는 대신에 synchronized블록을 사용해도 같은 효과를 얻을 수 있다. 스레드가 snychronized블럭으로 들어갈 때와 나올 때, 캐시와 메모리간의 동기화가 이루어지기 때문에 값의 불일치가 해소되기 때문이다. ( 단, 상수는 volatile 키워드를 붙일 수 없다. final과 volatile 둘 중에 하나만 선택해서 붙여야 하는데 애초에 상수는 값이 변경될 수 없기 때문에 당연한 얘기다.)
JVM은 데이터를 4byte 단위로 처리한다. 따라서 int와 int보다 작은 타입들은 한 번에 읽거나 쓰는 것이 가능하다. 즉, 단 하나의 명령어로 읽거나 쓰기가 가능하다는 뜻이다. 하나의 명령어는 더 이상 나눌 수 없는 최소의 작업단위 이므로 작업의 중간에 다른 쓰레드가 끼어들 틈이 없다. 그러나 long이나 double은 크기가 8byte로 하나의 명령어로 값을 읽거나 쓸 수 없다. 이럴 때도 volatile을 사용하면 해당 변수를 원자화(작업을 더 이상 나눌 수 없게 함)할 수 있다.
volatile long sharedVal;
volatile double shareedVal;
주의해야 할 점은, volatile을 쓴다고 해서 동기화가 되는 것은 아니다. 즉 임계영역이 지정되는 것이 아니므로 sychronized나 lock은 별도로 줘야 한다.
fork & join 프레임 워크
이 프레임워크는 하나의 작업을 작은 단위로 나눠서 여러 스레드가 동시에 처리하는 것을 쉽게 만들어 준다.
RecursiveAction - 반환 값이 없는 작업을 구현할 때 사용
RecursiveTask - 반환할 값이 있는 작업을 구현할 때 사용
두 클래스 모두 compute()라는 추상 메서드를 가지고 있는데, 상속을 통해 추상 메서드를 구현하기만 하면 된다.
compute()를 구현할 때는 수행할 작업과 작업을 어떻게 나눌지에 대해서도 알려줘야 한다.
public abstract class RecursiveAction extends ForkJoinTask<Void> {
protected abstract void compute(); // 상속을 통해 이 메서드를 구현함
}
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
V result;
protected abstract V compute(); // 상속을 통해 이 메서드를 구현함
}
그다음에는 invoke()를 통해 작업을 시작할 수 있다.
ForkJoinPool pool = new ForkJoinPool();// 쓰레드 풀을 생성
SumTask task = new SumTask(from, to); // 수행할 작업을 생성한다. 여기서 SumTask는 RecursiveTask를 상속받았다.
Long result = pool.invoke(task); // invoke()를 호출해서 작업을 시작한다.
ForkJoinPool은 fork&join 프레임워크에서 제공하는 스레드풀(thread pool)로, 지정된 수의 스레드를 생성해서 미리 만들어 놓고 반복해서 재사용할 수 있게 한다. 따라서 쓰레드를 반복해서 생성하지 않아도 되고, 너무 많은 스레드가 생성되어 성능이 저하되는 것을 막아주는 장점이 있다.
스레드풀은 스레드가 수행해야 하는 작업이 담긴 큐를 제공하며, 각 스레드는 자신의 작업 큐에 담긴 작업을 순서대로 처리한다.
- compute(), fork(), join()
compute()가 처음 호출되면, 작업할 범위를 나눠서 한쪽에는 fork()를 호출해서 작업 큐에 저장한다. 하나의 스레드는 compte()를 재귀호출하면서 작업을 계속해서 반으로 나누고, 다른 쓰레드는 fork()에 의해 작업 큐에 추가된 작업을 수행한다.
fork()가 호출되어 작업큐에 추가된 작업 역시 compute()에 의해 더 이상 나눌 수 없을 때까지 반복해서 나뉘고 자신의 작업 큐가 비어 있는 쓰레드는 다른 스레드의 작업 큐에서 작업을 가져와서 수행한다. 이 과정을 통해 여러 스레드가 골고루 작업을 나누어 처리하게 된다.
join()은 해당 작업의 수행이 끝날 때까지 기다렸다가 수행이 끝나면 그 결과를 반환한다. fork()가 비동기 메서드라 작업의 결과를 기다리지 않는 것과 반대로, join()은 작업의 수행을 기다리므로 동기 메서드에 해당한다.
'Dev > Java' 카테고리의 다른 글
Garbage Collector 과JVM 메모리 구조 ( OOP를 제대로 알기 ) (1) | 2024.05.01 |
---|---|
직렬화(Serialization) - 객체를 주고 받는 방법 (0) | 2024.04.15 |
Thread 를 알아보자 - mulit vs single, PrioirtyQueue, ThreadGroup, 실행제어( 2 / 3 ) (0) | 2024.04.01 |
Thread 를 알아보자 - 프로세스와 쓰레드, 태스킹과 쓰레딩, start() vs run() ( 1 / 3 ) (1) | 2024.03.25 |
열거형(enums) - 열거형을 통해 사용할 수 있는 방법들 (0) | 2024.03.25 |