-
[번역] Java 8의 동시성(Concurrency)개발언어/Java 2019. 5. 1. 23:35
부제 : Java8의 lambda에서 가장 중요한 원자적 변수와 동시성Map
이 튜토리얼은 동시성 API의 핵심 개념인 Atomic Variables and Concurrent Maps 에 대해서 이야기하고자 합니다. 두 개념 모두 Java 8에서 람다를 설명하는데 있어서 정말 중요한 개념입니다.
1. AtomicInteger
패키지 java.concurrent.atomic은 원자적 연산을 수행하기 위해 여러 유용한 연산들을 포함하고 있습니다. 연산이 원자적이라는 것은 `synchronized` 키워드나 lock을 사용하지 않고도 멀티스레드로 병렬 연산을 안전하게 실행할 수 있을 때를 말합니다.
내부적으로, 원자적인 클래스들은 comapre of swap(CAS)라는 원자적 명령어를 빈번하게 사용합니다. 이 명령어들은 보통 lock을 통해 동기화(synchronizing)하는 것 보다 훨씬 더 빠릅니다. 그래서 내가 할 수 있는 어드바이스는 동시에 단일 가변 객체(single mutable variable)를 수정할 경우 lock을 쓰는것 보다 원자적 클래스를 사용하는게 더 낫다고 충고하고 싶습니다. 지금부터 AtomicInteger를 이해하기 위해 예를 들면서 설명하도록 하겠습니다.
AtomicInteger atomicInt = new AtomicInteger(0); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> executor.submit(atomicInt::incrementAndGet)); stop(executor); System.out.println(atomicInt.get()); // => 1000
위의 코드에서 Integer를 AtomicInteger로 대체하여 사용함으로써 우리는 변수 접근을 동기화 하지 않고도 스레드 세이프(thread safe : 어떤 값에 수정이 일어날 때 다른 작업이 영향을 미치지 못하는 상태) 한 방법으로 number를 동시에 증가시킬 수 있습니다. increamentAndGet() 메서드는 원자적 연산이어서 우리는 안전하게 여러 스레드로부터 이 메서드를 호출할 수 있습니다. AtomicInteger는 여러종류의 원자적 연산을 지원하며, updateAndGet() 메서드는 integer에 대한 임의의 수학적 연산을 수행하기 위해 람다 표현식을 수용합니다.
AtomicInteger atomicInt = new AtomicInteger(0); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> { Runnable task = () -> atomicInt.updateAndGet(n -> n + 2); executor.submit(task); }); stop(executor); System.out.println(atomicInt.get()); // => 2000
accumulateAndGet() 메서드는 IntBinaryOperator타입의 람다 표현식에서 사용될 수 있습니다. 우리는 다음 예제에서 0부터 1000까지의 모든 값을 더하기 위한 메서드를 보고자 합니다.
AtomicInteger atomicInt = new AtomicInteger(0); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> { Runnable task = () -> atomicInt.accumulateAndGet(i, (n, m) -> n + m); executor.submit(task); }); stop(executor); System.out.println(atomicInt.get()); // => 499500
이 외에도 다른 유용한 원자적 클래스들은 AtomicBoolean, AtomicLongm 그리고 AtomicReference가 있습니다.
2. LongAdder
AtomicLong의 대안이 되는 LongAdder클래스는 어떤 number에 값들을 연속적으로 추가할 때 사용될 수 있습니다.
ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> executor.submit(adder::increment)); stop(executor); System.out.println(adder.sumThenReset()); // => 1000
LongAdder는 atomic number class처럼 add()와 increment() 를 제공하며 이 또한 스레드 세이프(thread safe)합니다. 그러나 단일 결과를 sum하는 것 대신에 이 클래스는 내부적으로 스레드들에 대한 소유권 주장을 감소시키기 위해 변수집합(a set of variables)을 유지합니다. 그리고 sum()이나 sumThenReset()을 호출함으로서 우리가 원하는 결과를 얻을 수 있습니다.
이 클래스는 보통 여러 스레드들에서 read하는 것보다 update가 더 많이 나타나는 경우에 사용하면 좋습니다. update가 많이 일어나는 경우란, 웹서버에 request가 얼마나 많이 일어나는지 그 수를 count하는 등의 통계적인 데이터를 다뤄야하는 것과 같은 상황을 말합니다. LongAdder의 단점은 변수집합이 인메모리(in-memory)에 잡혀있기 때문에 메모리 소비가 상당하다는 점을 들 수 있습니다.
3. LongAccumulator
LongAccumulator는 LongAdder의 더 일반적인 버전이라고 볼 수 있습니다. 단순 더하기 연산을 수행하는 대신에 LongAccumulator클래스는 `LongBinaryoperator` 타입의 람다 연산식을 사용하여 해당 연산을 수행합니다. 아래의 예제 코드를 살펴봅시다.
LongBinaryOperator op = (x,y) -> 2*x + y; LongAccumulator accumulator = new LongAccumulator(op, 1L); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 10) .forEach(i -> executor.submit(() -> accumulator.accumulate(i))); stop(executor); System.out.println(accumulator.getThenReset()); // => 2539
우리는 2 * x + y 라는 수식을 갖는 함수와 하나의 초기값(두번째 매개변수인 1L)을 갖는 LongAccumulator를 생성했습니다. accumulate(i)의 호출이 이루어질 때마다 현재 결과값과 변수 i는 람다 표현식에 매개변수로 주어지게 됩니다. LongAccumulator도 LongAdder처럼 여러 스레드들 간의 소유권 다툼을 감소시키기 위해서 내부적으로 변수집합(a set of variabels)을 유지합니다.
4. ConcurrentMap
인터페이스 `ConcurrentMap`은 map 인터페이스를 확장시켰을 뿐만 아니라 가장 유용한 동시성 컬렉션(concurrent collection)타입 중에 하나를 정의합니다. Java 8은 이 인터페이스에 새로운 메서드들을 추가함으로써 함수적인 프로그래밍(funcational programming)을 소개합니다.
새로운 메서드들을 설명하기 위해서 아래의 단순 map을 사용한 예제를 봅시다.
ConcurrentMap<String, String> map = new ConcurrentHashMap<>(); map.put("foo", "bar"); map.put("han", "solo"); map.put("r2", "d2"); map.put("c3", "p0");
아래 코드에서 forEach() 메서드는 파라미터로 map의 key와 value를 받는 BiConsumer타입의 람다 표현식을 수용합니다. 이것은 동시성 맵(concurrent map)이 갖는 엔트리들을 순회(iterate)하는 for-each loop를 대체하기 위해 사용합니다. 이 iteration은 현재 스레드에서 연속적으로 수행됩니다.
map.forEach((key, value) -> System.out.printf("%s = %s\n", key, value));
아래 코드에서 putIfAbsent() 메서드는 변수로 받은 키에 어떠한 값도 존재하지 않는 경우 새로운 값을 map에 입력합니다.(아래의 코드에서는 위에서 입력한대로 c3라는 key가 p0라는 value를 가지고 있기 때문에 p0가 결과로 나왔습니다. 만일, c3 key에 값이 null였다면 p1이 출력됐을 것입니다.) 적어도 이 ConcurrentHashMap는 thread-safe하기 때문에, 다른 스레드들이 동시에 map에 접근하는 경우에도 동기화(synchronize)를 따로 할 필요가 없습니다.
String value = map.putIfAbsent("c3", "p1"); System.out.println(value); // p0
getOrDefault() 메서드는 주어진 key에 값을 리턴합니다. 즉, 입력된 key 값이 존재하지 않는 경우 default로 설정된 값을 리턴받게 됩니다.
String value = map.getOrDefault("hi", "there"); System.out.println(value); // there
replaceAll()메서드는 BiFunction 타입의 람다 표현식을 수용합니다. BiFunction은 두 개의 파라미터를 입력 받고 단일값을 리턴하는 것을 말합니다. 이 경우는 각각의 map 엔트리의 key와 value를 받아 함수가 호출되고 현재 키에 할당된 새 값을 리턴하는 경우를 말합니다.
map.replaceAll((key, value) -> "r2".equals(key) ? "d3" : value); System.out.println(map.get("r2")); // d3
map의 모든 값들을 대체(replacement)하는 것 대신에 `compute()`로 우리가 단일 엔트리를 변형시킬 수 있습니다. 이 메서드는 key와 bi-functin을 매개변수로 받습니다.
map.compute("foo", (key, value) -> value + value); System.out.println(map.get("foo")); // barbar
compute() 외에도 computeIfAbsent()와 computeIfPresent()라는 두 개의 함수가 있습니다. 만일 키가 각각 존재하거나 존재하지 않는다면 이 메서드들의 functional parameters가 호출되게 됩니다.
마지막으로 merge() 메서드는 map에 이미 존재하는 value와 새로운 value를 통합하는 작업을 하는 경우에 유용합니다. Merge는 key와 새로운 value를 받아서 이미 존재하는 map 엔트리에 합치는 작업을 할 수 있습니다. 그리고 bi-function이 두 값을 병합시키는데에 특정 로직을 수행하게 됩니다.
map.merge("foo", "boo", (oldVal, newVal) -> newVal + " was " + oldVal); System.out.println(map.get("foo")); // boo was foo
5. ConcurrentHashMap
위에서 언급한 모든 메서드들은 ConcurrentMap 인터페이스의 일부분이며, ConcurrentMap은 map에 대한 병렬연산을 수행하는 두개의 새로운 메서드들로 더 강화됩니다.
병렬스트림(parallel stream)처럼 이 메서드들은 Java8에 있는 ForkJoinPool.commonPool()을 통해 이용할 수 있는 특별한 ForkJoinPool을 사용합니다.이 pool은 현재 사용 가능한 코어들의 수에 의존하여 미리 설정된 병렬화(parallelism)를 사용합니다. 현재 제 컴퓨터에는 4개의 CPU 코어가 존재하는데 결과적으로 3개는 병렬화가 가능합니다.
System.out.println(ForkJoinPool.getCommonPoolParallelism()); // 3
이 값은 아래에 나타나는 JVM 파라미터의 셋팅값에 의해 증가되거나 감소될 수 있습니다.
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
우리는 설명할 목적으로 위에서 사용했던 같은 예제를 다시 보고자 합니다.
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(); map.put("foo", "bar"); map.put("han", "solo"); map.put("r2", "d2"); map.put("c3", "p0");
Java8에서는 forEach, search, reduce라는 3가지 종류의 병렬 연산을 소개합니다. 이 연산들 각각은 key, value, 엔트리 또는 key-value 쌍을 인자로 받을 수 있습니다.
이 메서드들은 첫번째 인자로 parallelismThreshold라는 인자를 사용합니다. 이 threshold는 연산이 병렬적으로 실행되어져야 할 때 collection의 최소 size를 말합니다. 예를들어, 500개의 threshold를 넘겨받고, 맵의 사이즈가 499인 경우, 그 연산은 단일 스레드에서 연속적으로 실행 될 것입니다.
1) ForEach
forEach() 메서드는 map의 key-value 쌍을 병렬적으로 순회합니다. BiConsumer 타입의 람다 표현식은 현재 순회 스텝의 key와 value로 호출됩니다. 병렬실행을 시각화하기 위해서 우리는 현재 스레드들의 이름을 콘솔에 출력하려고 합니다. 우리가 기억해야 할 점은 ForkJoinPool이 3개의 스레드를 최대로 사용한다는 점입니다. (그래서 맨 아래에 나온 결과인 c3는 마지막에 출력됨.)
map.forEach(1, (key, value) -> System.out.printf("key: %s; value: %s; thread: %s\n", key, value, Thread.currentThread().getName())); // key: r2; value: d2; thread: main // key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1 // key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2 // key: c3; value: p0; thread: main
2) Search
Search()는 만일 현재 순회값이 예상했던 search 기준과 매칭되지 않는다면 현재 key-value 쌍의 non-null인 search 결과를 리턴하거나 null을 리턴하는 BiFunction을 수용합니다. 기억해야 할 점은 ConcurrentHashMap이 정렬되지 않은 상태(unordered)라는 점입니다. search 함수는 map의 처리 순서에 의존해서는 안된다는 점입니다. 만일 map의 여러 엔트리들이 주어진 search function과 일치하는 경우 그 결과는 비 결정적(non-deterministic : 어떤 함수 f가 있을 때, a->b가 아니라 a -> [b] 처럼 여러개의 결과를 내놓는 것.)일지도 모른다.
String result = map.search(1, (key, value) -> { System.out.println(Thread.currentThread().getName()); if ("foo".equals(key)) { return value; } return null; }); System.out.println("Result: " + result); // ForkJoinPool.commonPool-worker-2 // main // ForkJoinPool.commonPool-worker-3 // Result: bar
아래의 또 다른 예제를 보자
String result = map.searchValues(1, value -> { System.out.println(Thread.currentThread().getName()); if (value.length() > 3) { return value; } return null; }); System.out.println("Result: " + result); // ForkJoinPool.commonPool-worker-2 // main // main // ForkJoinPool.commonPool-worker-1 // Result: solo
3) Reduce
이미 Java 8 스트림에서 잘 알려져 있는 reduce()메서드는 Bifunction 타입의 두 개의 람다 표현식을 수용합니다. 첫번째 함수는 각각의 key-value 쌍을 어떤 타입의 단일값으로 변형시킵니다. 두 번째 함수는 이 모든 변형된 값들을 하나의 결과로 결합하고, null값의 경우는 무시합니다.
String result = map.reduce(1, (key, value) -> { System.out.println("Transform: " + Thread.currentThread().getName()); return key + "=" + value; }, (s1, s2) -> { System.out.println("Reduce: " + Thread.currentThread().getName()); return s1 + ", " + s2; }); System.out.println("Result: " + result); // Transform: ForkJoinPool.commonPool-worker-2 // Transform: main // Transform: ForkJoinPool.commonPool-worker-3 // Reduce: ForkJoinPool.commonPool-worker-3 // Transform: main // Reduce: main // Reduce: main // Result: r2=d2, c3=p0, han=solo, foo=bar
이 글을 작성한 사람으로서 Java 8의 동시성에 대한 나의 튜토리얼의 세번째 파트를 즐겁게 읽어주셨으면 좋겠습니다. 이 튜토리얼에서 나온 모든 예제코드는 <https://github.com/winterbe/java8-tutorial> 에 있습니다.
출처: https://winterbe.com/posts/2015/05/22/java8-concurrency-tutorial-atomic-concurrent-map-examples/
'개발언어 > Java' 카테고리의 다른 글
Java Exception (0) 2020.03.16 예외 처리 (0) 2020.03.15 [Java 기초] Stream - 1 (0) 2019.05.26 [Java 기초] 멀티 스레드 - 1 (2) 2019.05.13