Это руководство дает не только общее представление о Stream в Java 8, но и полезные знания которые будут востребованы в будущем. Когда я впервые узнал о Stream API, я был удивлен таким названием, поскольку это звучит очень похоже на InputStream
или OutputStream
из Java I/O. Но Java 8 Stream это совсем другое. Streams (стримы/потоки) являются Монадами, которые играют большую роль в обеспечении функционального программирования в Java.
В функциональном программировании монада является структурой, которая представляет вычисления в виде последовательности шагов.
Это руководство научит вас основам работы с Java 8 Streams, а также научит использовать операции над потоками Streams. Вы узнаете о порядке обработки и о том, как упорядочение потока операций влияет на производительность во время выполнения. Более мощные операции Stream API: reduce, collect и flatMap также будут подробно описаны. Руководство заканчивается углубленным изучением параллельных потоков.
Как потоки работают?
Поток представляет собой последовательность элементов и поддерживает различные виды операций для выполнения вычислений:
1 2 3 4 5 6 7 8 9 10 11 |
List<String> mList = Arrays.asList("aa1","cc2", "cc1", "aa2", "bb1"); mList .stream() .filter(s -> s.startsWith("a")) .map(String::toUpperCase) .sorted() .forEach(System.out::println); //Результат выполнения: // AA1 // AA2 |
Операции над потоком относятся либо к промежуточным, либо к терминальным. Все промежуточные операции возвращают поток, так что мы можем объединять несколько промежуточных операций без использования точки с запятой. Терминальные операции возвращают void или непотоковый результат. В приведенном выше примере filter, map и sorted являются промежуточными операциями, тогда как forEach является терминальной операцией. Для получения полного списка всех доступных операций потока смотрите Javadoc по Stream.
Большинство операций потока принимают в качестве параметров какие-то лямбда-выражения, в функциональный интерфейс точное поведение по каждой операции. Большинство этих операций должны быть как неинтерферирующими (non-interfering), так и лишенными состояния (stateless). Что это значит?
Неинтерферирующуя функция не изменяет основной источник данных потока. Например, в приведенном выше примере лямбда выражение не изменяет mList
путем добавления или удаления элементов из коллекции.
Лишенная состояния функция — выполнение операции является детерминированным, например, в приведенном выше примере лямбда-выражение не зависит от какой-либо изменяемой переменной или состояния из внешней среды, которая могла бы измениться во время выполнения.
Различные виды потоков (Streams)
Потоки могут быть созданы из различных источников данных, особенно коллекций. Коллекции List и Set поддерживают новые методы stream() и parallelStream(). Параллельные потоки способны работать на нескольких нитях и будут рассмотрены в следующем разделе этого руководства. Мы ориентируемся на последовательные потоки:
1 2 3 4 5 |
Arrays.asList("сс1", "сс2", "сс3") .stream() .findFirst() .ifPresent(System.out::println); //Результат выполнения: сс1 |
Вызов метода stream()
по перечню объектов возвращает поток. Но мы не должны создавать коллекции для того, чтобы работать с потоками. Смотрим на следующем примере кода:
1 2 3 4 |
Stream.of("сс1", "сс2", "сс3") .findFirst() .ifPresent(System.out::println); //Результатом будет сс1 |
Просто используйте Stream.of()
, чтобы создать поток с кучей ссылок на объекты.
Кроме регулярных объектов потоков Java 8, еще предоставляются особые виды потоков для работы с примитивными типами данных int
, long
и double
. Как вы уже догадались, это IntStream
, LongStream
и DoubleStream
.
IntStreams может заменить обычный цикл for на изящный IntStream.range()
:
1 2 3 4 5 6 7 |
IntStream.range(8, 12) .forEach(System.out::println); //Результат выполнения: // 8 // 9 // 10 // 11 |
Все эти примитивные потоки работают так же, как и обычные объектные потоки, но со следующими отличиями:
Примитивные потоки используют специализированные лямбда-выражения, например IntFunction
Function или
IntPredicate
вместо Predicate
. И примитивные потоки поддерживают дополнительные терминальные операции sum()
и average()
:
1 2 3 4 5 |
Arrays.stream(new int[] {1, 2, 3}) .map(n -> 2 * n + 1) .average() .ifPresent(System.out::println); //Результатом выполнения будет 5.0 |
Иногда это полезно преобразовать поток объекта к примитивному потоку или наоборот. Для этой цели потоки объектов поддерживают специальные операций картирования mapToInt()
, mapToLong()
и mapToDouble()
:
1 2 3 4 5 6 |
Stream.of("c1", "c2", "c3") .map(s -> s.substring(1)) .mapToInt(Integer::parseInt) .max() .ifPresent(System.out::println); //Результатом выполнения будет 3 |
Примитивные потоки могут быть преобразованы в объектные потоки с помощью метода mapToObj()
:
1 2 3 4 5 6 7 |
IntStream.range(1, 4) .mapToObj(i -> "с" + i) .forEach(System.out::println); // Результат выполнения операции: // с1 // с2 // с3 |
Вот комбинированный пример: поток с double сначала преобразуется в int’овый поток и затем в объектный поток строк:
1 2 3 4 5 6 7 8 |
Stream.of(1.0, 2.0, 3.0) .mapToInt(Double::intValue) .mapToObj(i -> "с" + i) .forEach(System.out::println); // Результат выполнения операции: // с1 // с2 // с3 |
Порядок обработки
Теперь, когда мы научились создавать и работать с различными видами потоков, давайте углубляться в то, как поток операций обрабатываются под капотом.
Посмотрите на этот образец, где отсутствует терминальная операция:
1 2 3 4 5 |
Stream.of("dd2", "aa2", "bb1", "bb3", "cc4") .filter(s -> { System.out.println("Фильтр: " + s); return true; }); |
При выполнении этого фрагмента кода ничего не печатается на консоль. Это происходит потому, что промежуточные операции будут выполнены только тогда, когда присутствуют терминальные операции.
Давайте расширим предыдущий пример использованием forEach:
1 2 3 4 5 6 |
Stream.of("dd2", "aa2", "bb1", "bb3", "cc4") .filter(s -> { System.out.println("Фильтр: " + s); return true; }) .forEach(s -> System.out.println("Печать с использованием forEach: " + s)); |
Выполнив этот код, на консоль выведется следующее:
1 2 3 4 5 6 7 8 9 10 |
Фильтр: dd2 Печать с использованием forEach: dd2 Фильтр: aa2 Печать с использованием forEach: aa2 Фильтр: bb1 Печать с использованием forEach: bb1 Фильтр: bb3 Печать с использованием forEach: bb3 Фильтр: cc4 Печать с использованием forEach: cc4 |
Порядок выполнения может вас удивить. На первый взгляд все операции будут выполняться по горизонтали одна за другой по всем элементам потока. Но в нашем примере первая строка «dd2» полностью проходит фильтр forEach, потом обрабатывается вторая строка «aa2» и так далее.
Такое поведение может привести к снижению фактического количества операций, выполняемых на каждом элементе. Это мы видим на следующем примере:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
Stream.of("dd2", "aa2", "bb1", "bb3", "cc4") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .anyMatch(s -> { System.out.println("anyMatch: " + s); return s.startsWith("A"); }); // Результат выполнения представлен ниже // map: dd2 // anyMatch: DD2 // map: aa2 // anyMatch: AA2 |
Операция anyMatch
возвращает true, как только предикат применится к входному элементу. Это подходит для второго элемента «аа2». В связи с вертикальным исполнением цепи потока, map будет выполнен два раза.
Почему порядок выполнения в stream имеет значение
Следующий пример состоит из двух промежуточных операций map и filter, а также выполнение терминала forEach
. Давайте еще раз посмотрим порядок выполнения этих операций:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
Stream.of("dd2", "aa2", "bb1", "bb3", "cc4") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("A"); }) .forEach(s -> System.out.println("forEach: " + s)); // Результат выполнения // map: dd2 // filter: DD2 // map: aa2 // filter: AA2 // forEach: AA2 // map: bb1 // filter: BB1 // map: bb3 // filter: BB3 // map: cc // filter: CC |
Как вы уже догадались, map и filter называются пять раз для каждой строки в базовой коллекции, тогда как forEach вызывается только один раз.
Мы можем сильно уменьшить фактическое количество выполнений, если мы изменим порядок операций, передвинув filter в начало цепочки:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
Stream.of("dd2", "aa2", "bb1", "bb3", "cc4") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); // filter: dd2 // filter: aa2 // map: aa2 // forEach: AA2 // filter: bb1 // filter: bb3 // filter: cc |
Теперь, map вызывается только один раз и будет выполняться быстрее для большого количества входных элементов. Имейте это в виду при составлении комплексного метода цепи.
Давайте расширим предыдущий пример с дополнительной операцией, sorted:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Stream.of("dd2", "aa2", "bb1", "bb3", "cc4") .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); |
Сортировка является особым видом промежуточных операций. Это так называемые операции состояния.
Выполнение этого примера приводит следующий вывод консоли:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
sort: aa2; dd2 sort: bb1; aa2 sort: bb1; dd2 sort: bb1; aa2 sort: bb3; bb1 sort: bb3; dd2 sort: cc4; bb3 sort: cc4; dd2 filter: aa2 map: aa2 forEach: AA2 filter: bb1 filter: bb3 filter: cc4 filter: dd2 |
Во-первых, операция сортировки выполняется на всей совокупности входных данных. Другими словами, sorted
выполнен в горизонтальном направлении. Таким образом, sorted вызывается 8 раз для нескольких комбинаций на каждом элементе во входной коллекции.
Теперь мы можем оптимизировать производительность:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
Stream.of("dd2", "aa2", "bb1", "bb3", "cc4") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); // filter: dd2 // filter: aa2 // filter: bb1 // filter: bb3 // filter: cc4 // map: aa2 // forEach: AA2 |
В этом примере sorted никогда не вызывали, потому что filter уменьшает входную коллекцию до одного элемента. В этом случае производительность значительно увеличивается для больших входных коллекций.
Повторное использование Потоков (Streams)
Потоки в Java 8 не могут быть использованы повторно. Как только вы называете какую-нибудь терминальную операция, поток закрывается
1 2 3 4 5 6 |
Stream<String> stream = Stream.of("dd2", "aa2", "bb1", "bb3", "cc") .filter(s -> s.startsWith("a")); stream.anyMatch(s -> true); // операция выполнится успешно stream.noneMatch(s -> true); // Вылетит Exception |
Вызов noneMatch
после anyMatch
в одном и том же stream вызовет следующее исключение:
1 2 3 4 |
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229) at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459) ... |
Чтобы избежать этого, мы должны создать новую цепь для каждой терминальной операции.
1 2 3 4 5 6 |
Supplier<Stream<String>> streamSupplier = () -> Stream.of("dd2", "aa2", "bb1", "bb3", "cc") .filter(s -> s.startsWith("a")); streamSupplier.get().anyMatch(s -> true); // операция пройдет успешно streamSupplier.get().noneMatch(s -> true); // здесь также все будет ok |
Продвинутые операции
collect
, flatMap
и reduce
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
class Person { String name; int age; Person(String name, int age) { this.name = name; this.age = age; } @Override public String toString() { return name; } } List<Person> persons = Arrays.asList( new Person("Andrew", 20), new Person("Igor", 23), new Person("Ira", 23), new Person("Vitia", 12)); |
Наверняка многие из вас обратили внимание на очень удобный трюк создания и инициализации List в одну строку.
Операция Collect
Collector,
который состоит из четырех различных операций: поставщик, аккумулятор, объединитель и финишер. Это звучит очень сложно, но это только на первый взгляд. Фишкой Java 8 является поддержка различных встроенных коллекторов через класс Collectors
. Именно поэтому работа с ними будет намного проще.
1 2 3 4 5 6 7 |
List<Person> filtered = persons .stream() .filter(p -> p.name.startsWith("I")) .collect(Collectors.toList()); System.out.println(filtered); // [Igor, Ira] |
Как видите, у нас получилось всего 6 строчек кода.
Следующий пример группирует всех по возрасту:
1 2 3 4 5 6 7 8 9 10 |
Map<Integer, List<Person>> personsByAge = persons .stream() .collect(Collectors.groupingBy(p -> p.age)); personsByAge .forEach((age, p) -> System.out.format("age %s: %s\n", age, p)); // age 20: [Andrew] // age 23: [Igor, Ira] // age 12: [Vitia] |
Collectors — чрезвычайно универсальные. Вы также можете создавать группы элементов потока, например, определение среднего возраста всех лиц:
1 2 3 4 5 |
Double averageAge = persons .stream() .collect(Collectors.averagingInt(p -> p.age)); System.out.println(averageAge); // 19.5 |
Если вы заинтересованы в более полной статистике, то collectors возвращает специальный встроенный объект со сводной статистикой. Таким образом, мы можем просто определить минимальный, максимальный и средний арифметический возраст, а также найти сумму и количество.
1 2 3 4 5 6 7 8 |
IntSummaryStatistics ageSummary = persons .stream() .collect(Collectors.summarizingInt(p -> p.age)); System.out.println(ageSummary); // Результат выполнения: // IntSummaryStatistics{count=4, sum=78, min=12, average=19.500000, max=23} |
Следующий пример соединяет всех людей в одну строку:
1 2 3 4 5 6 7 8 |
String phrase = persons .stream() .filter(p -> p.age >= 18) .map(p -> p.name) .collect(Collectors.joining(" и ", "В Германии ", " совершеннолетние.")); System.out.println(phrase); // В Германии Andrew и Igor и Ira совершеннолетние. |
Коллектор join принимает разделитель, а также дополнительный префикс и суффикс.
Для того чтобы трансформировать элементы потока в map, мы должны указать, как ключи и значения должны быть нанесены на map. Имейте в виду, что замапенные ключи должны быть уникальными, иначе IllegalStateException
не избежать. Вы можете передать функцию слияния в качестве дополнительного параметра, чтобы обойти исключение:
1 2 3 4 5 6 7 8 9 |
Map<Integer, String> map = persons .stream() .collect(Collectors.toMap( p -> p.age, p -> p.name, (name1, name2) -> name1 + ";" + name2)); System.out.println(map); // {20=Andrew, 23=Igor;Ira, 12=Vitya} |
Теперь, когда мы знаем, некоторые мощные встроенные коллекторы, давайте попробуем построить свой собственный специальный коллектор. Мы хотим превратить всех людей в потоке в одну строку, состоящую из всех имен в верхнем регистре, разделенных знаком "|"
.
Для достижения этого мы создаем новый коллектор через Collector.of()
. Мы должны пройти четыре этапа использования collectors: supplier, accumulator, combiner и finisher.
1 2 3 4 5 6 7 8 9 10 11 12 |
Collector<Person, StringJoiner, String> personNameCollector = Collector.of( () -> new StringJoiner(" | "), // supplier (j, p) -> j.add(p.name.toUpperCase()), // accumulator (j1, j2) -> j1.merge(j2), // combiner StringJoiner::toString); // finisher String names = persons .stream() .collect(personNameCollector); System.out.println(names); // ANDREW | IGOR | IRA | VITIA |
Так как строки в Java неизменные, нам нужен вспомогательный класс StringJoiner
, чтобы коллектор мог построить нашу строку. Supplier изначально создает такой StringJoiner с соответствующим разделителем. Accumulator используется для добавления имени каждого человека в верхний регистр. Combiner знает как объединить два StringJoiner в один. На последнем этапе Finisher строит желаемую строку из StringJoiner.
FlatMap
Мы уже научились преобразовывать объекты потока в другой тип объектов, используя операции с map. Map ограничена, потому что каждый объект может быть отображен только одним объектом. Но что, если мы хотим преобразовать один объект в нескольких других? На помощь здесь приходит flatMap
.
FlatMap преобразует каждый элемент потока в поток других объектов. Таким образом, каждый объект будет преобразован в ноль, один или несколько других объектов, поддерживаемых потоком. Содержание этих потоков будет затем помещают в возвращаемом потоке flatMap
операции.
Прежде, чем мы увидим flatMap
в действии мы должны соответствующий тип иерархии:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
class Foo { String name; List<Bar> bars = new ArrayList<>(); Foo(String name) { this.name = name; } } class Bar { String name; Bar(String name) { this.name = name; } } |
Далее, мы используем наши знания о потоках для того, чтобы создать несколько объектов:
1 2 3 4 5 6 7 8 9 10 11 12 |
List<Foo> foos = new ArrayList<>(); // create foos IntStream .range(1, 4) .forEach(i -> foos.add(new Foo("Foo" + i))); // create bars foos.forEach(f -> IntStream .range(1, 4) .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name)))); |
Теперь у нас есть список из трех функций, каждая из которых состоит из трех баров. FlatMap принимает функцию, которая должна возвращать поток объектов:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
foos.stream() .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name)); // Bar1 <- Foo1 // Bar2 <- Foo1 // Bar3 <- Foo1 // Bar1 <- Foo2 // Bar2 <- Foo2 // Bar3 <- Foo2 // Bar1 <- Foo3 // Bar2 <- Foo3 // Bar3 <- Foo3 |
Как видите, мы успешно преобразовали поток трех объектов Foo в поток девяти объектов.
Наконец, код выше можно упростить:
1 2 3 4 5 6 7 |
IntStream.range(1, 4) .mapToObj(i -> new Foo("Foo" + i)) .peek(f -> IntStream.range(1, 4) .mapToObj(i -> new Bar("Bar" + i + " <- " + f.name)) .forEach(f.bars::add)) .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name)); |
FlatMap также доступна для Optional класса, введенного в Java 8. Optional операции класса flatMap возвращает дополнительный объект другого типа..
Посмотрите на иерархическую структуру типа этой:
1 2 3 4 5 6 7 8 9 10 11 |
class Outer { Nested nested; } class Nested { Inner inner; } class Inner { String foo; } |
Далее нужно добавить многочисленные проверки на нулевые значения.
1 2 3 4 |
Outer outer = new Outer(); if (outer != null && outer.nested != null && outer.nested.inner != null) { System.out.println(outer.nested.inner.foo); } |
Такое же поведение можно получить, используя операцию optionalflatMap :
1 2 3 4 5 |
Optional.of(new Outer()) .flatMap(o -> Optional.ofNullable(o.nested)) .flatMap(n -> Optional.ofNullable(n.inner)) .flatMap(i -> Optional.ofNullable(i.foo)) .ifPresent(System.out::println); |
Каждый вызов flatMap возвращает Optional обертку объекта.
Reduce
Операция Reduce сочетает в себе все элементы потока в единый результат. Java 8 поддерживает три различных вида reduce метода. Давайте посмотрим, как мы можем использовать один метод для определения самого старшего человека:
1 2 3 4 |
persons .stream() .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2) .ifPresent(System.out::println); // Ira |
Первый Reduce метод
Reduce метод принимает функцию аккумулятора BinaryOperator. Это на самом деле BiFunction, когда оба операнда имеют один и тот же тип, в этом случае Person. BiFunctions похожи на Function, но принимает два аргумента. Пример функции сравнивает людей по возрасту и возвращает самого старшего.
Второй Reduce метод
Второй Reduce метод принимает идентифицирующее значение и BinaryOperator. Этот метод может быть использован для «создания» нового человека с агрегированным имен и возрастом других человек в потоке:
1 2 3 4 5 6 7 8 9 10 11 |
Person result = persons .stream() .reduce(new Person("", 0), (p1, p2) -> { p1.age += p2.age; p1.name += p2.name; return p1; }); System.out.format("name=%s; age=%s", result.name, result.age); // name=AndrewIgorIraVitia; age=78 |
Третий Reduce
Третий Reduce метод принимает три параметра: значение идентификатора,BiFunction аккумулятор и объединитель функции типа BinaryOperator. Поскольку идентифицирующее значение не ограничивает тип Person, мы можем использовать это сокращение для определения суммы возрасте от всех лиц:
1 2 3 4 5 |
Integer ageSum = persons .stream() .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2); System.out.println(ageSum); // 78 |
Как видим, результат получился 78, но что же произошло под катом? Давайте посмотрим вывод с подробным описанием:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
Integer ageSum = persons .stream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; }); //accumulator: sum=0; person=Andrew //accumulator: sum=20; person=Igor //accumulator: sum=43; person=Ira //accumulator: sum=66; person=Vitia |
Как видим, аккумулирующая функция делает всю работу. Сначала вызывается инициализирующая значение 0 и первый человек Андрей. В следующих трех вызовах «sum» увеличивается возраст до суммарного 78.
Подождите, что? Комбайнер никогда не вызывается? Выполнение этого стрима параллельно раскроет секрет:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
Integer ageSum = persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; }); //accumulator: sum=0; person=Ira //accumulator: sum=0; person=Vitia //accumulator: sum=0; person=Andrew //accumulator: sum=0; person=Igor //combiner: sum1=23; sum2=12 //combiner: sum1=20; sum2=23 //combiner: sum1=43; sum2=35 |
Выполнение этого потока параллельно приведет к совершенно иным результатам. Теперь комбайнер действительно вызывается. С тех пор как аккумулятор вызывается параллельно, комбайнеру необходимо суммировать отдельные значения.
Давайте глубже погрузимся в мир параллельных потоков в следующей главе.
Параллельные потоки
Потоки могут быть выполнены параллельно, чтобы увеличить производительность выполнения на большом количестве входных элементов. Параллельные потоки используют общий ForkJoinPool
доступный через статический ForkJoinPool.commonPool()
метод. Размер основного пула потоков использует до пяти нитей — в зависимости от количества доступных физических ядер процессора:
1 2 |
ForkJoinPool commonPool = ForkJoinPool.commonPool(); System.out.println(commonPool.getParallelism()); // 3 |
На моей машине общий пул инициализируется с параллелизмом 3 по-умолчанию. Это значение может быть уменьшено или увеличено путем установки следующих параметров JVM:
1 |
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5 |
Коллекции поддерживает метод parallelStream(), чтобы создать параллельный поток элементов. Кроме того, вы можете вызвать промежуточный метод parallel() на данном потоке, чтобы преобразовать последовательный поток в параллельной копии.
Для того, чтобы занизить поведение параллельного выполнения параллельного потока Следующий пример печатает информацию о текущем потоке в Sout
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName())); |
После дебага мы получим лучшее понимание, какие потоки на самом деле используется для выполнения операций потока:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
filter: b1 [main] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: c2 [ForkJoinPool.commonPool-worker-3] map: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: A2 [ForkJoinPool.commonPool-worker-1] map: b1 [main] forEach: B1 [main] filter: a1 [ForkJoinPool.commonPool-worker-3] map: a1 [ForkJoinPool.commonPool-worker-3] forEach: A1 [ForkJoinPool.commonPool-worker-3] forEach: C1 [ForkJoinPool.commonPool-worker-2] |
Как вы можете видеть, параллельный поток использует все доступные темы из общей ForkJoinPool
для выполнения операций потока. Вывод может отличаться при последовательном запуске, потому что поведение, которое конкретный поток использует не является детерминированным.
Давайте расширим например с помощью дополнительной операции потока — sort:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .sorted((s1, s2) -> { System.out.format("sort: %s <> %s [%s]\n", s1, s2, Thread.currentThread().getName()); return s1.compareTo(s2); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName())); |
Результат может быть странным на первый взгляд:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
filter: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: b1 [main] map: b1 [main] filter: a1 [ForkJoinPool.commonPool-worker-2] map: a1 [ForkJoinPool.commonPool-worker-2] map: c2 [ForkJoinPool.commonPool-worker-3] sort: A2 <> A1 [main] sort: B1 <> A2 [main] sort: C2 <> B1 [main] sort: C1 <> C2 [main] sort: C1 <> B1 [main] sort: C1 <> C2 [main] forEach: A1 [ForkJoinPool.commonPool-worker-1] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: B1 [main] forEach: A2 [ForkJoinPool.commonPool-worker-2] forEach: C1 [ForkJoinPool.commonPool-worker-1] |
Кажется, что sort
выполняется последовательно только основной нитью. Но это не так. На самом деле, sort
на параллельном потоке использует новый Java 8 метод Arrays.parallelSort()
под капотом. Имейте в виду, что отладочный вывод относится только к исполнению переданного лямбда-выражения. Так, sort
компаратор выполнен только на главном потоке, но фактическое алгоритм сортировки выполняется параллельно.
Возвращаясь к reduce
, например, из последней секции. Мы уже выяснили, что функция комбайнер вызывается только параллельно, но не в последовательных потоках. Давайте посмотрим, какие потоки фактически участвуют:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
List<Person> persons = Arrays.asList( new Person("Andrew", 20), new Person("Igor", 23), new Person("Ira", 23), new Person("Vitia", 12)); persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s [%s]\n", sum, p, Thread.currentThread().getName()); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s [%s]\n", sum1, sum2, Thread.currentThread().getName()); return sum1 + sum2; }); |
Вывод на консоль показывает, что оба аккумулятора и комбайнера функции выполняются параллельно на всех доступных потоках:
1 2 3 4 5 6 7 |
accumulator: sum=0; person=Ira [main] accumulator: sum=0; person=Andrew [ForkJoinPool.commonPool-worker-2] accumulator: sum=0; person=Vitia [ForkJoinPool.commonPool-worker-3] accumulator: sum=0; person=Igor [ForkJoinPool.commonPool-worker-1] combiner: sum1=23; sum2=12 [ForkJoinPool.commonPool-worker-3] combiner: sum1=20; sum2=23 [ForkJoinPool.commonPool-worker-1] combiner: sum1=43; sum2=35 [ForkJoinPool.commonPool-worker-1] |
Таким образом, можно констатировать, что параллельные потоки могут дать хороший прирост производительности в потоках с большим количеством входных элементов. Но имейте в виду, что некоторые параллельные операции потока reduce
и collect требуют дополнительные расчеты (комбинированные операции), которые не нужны при последовательном выполнении.
Итоги
Руководство по программированию стримов на Java 8 закончилось. Если Вас заинтересовали новые возможности Java 8, рекомендую ознакомится с документацией Stream Javadoc.
Делитесь этой статьей с друзьями и коллегами, а также следите за обновлениями на Prologistic.com.ua
P.S. Подробнее о других особенностях Java 8 вы можете ознакомиться в этой статье.
Перевод статьи winterbe.com
Код который следует после фразы «Выполнив этот код, на консоль выведется следующее:»
есть опечатка, отсутствует цифра 4 в выводе в консоль
должно быть так:
Фильтр: cc4
Печать с использованием forEach: cc4
Да, Вы правы. Спасибо за исправление!
В последнем предложении раздела «Почему порядок выполнения в stream имеет значение» опечатка:
«В этом случае производительность значительно снижается для больших входных коллекций.». Нужно так:
«В этом случае производительность значительно УВЕЛИЧИВАЕТСЯ для больших входных коллекций.»
Да, Вы правы — уже подправил. Спасибо за замечание!
Большое спасибо за хороший цикл статей по восьмой версии!
Спасибо!
На первый взгляд, выглядит как имплементация паттерна билдер:
mList
.stream()
.filter(s -> s.startsWith(«a»))
.map(String::toUpperCase)
.sorted()
Так ли это?
Как узнать стрим сортированный или нет?
void apply(Stream stream) {
if(!stream.isSoretd()) stream = stream.sorted();
…..
}