Java 8 - Aggregate operations on Streams

This post is in continuation with my earlier posts on Streams. In this post we will discuss about aggregate operations on Streams.

Aggregate operations on Streams

You can perform intermediate and terminal operations on Streams. Intermediate operations result in a new stream and are lazily evaluated and will start when terminal operation is called.

persons.stream().filter(p -> p.getGender() == Gender.MALE).forEach(System.out::println);

In the snippet above, filter() doesn't start filtering immediately but create a new stream. It will only start when terminal operation is called and in above case when forEach().

Intermediate operations

There are many intermediate operations that you can perform on Streams. Some of them are filter(), distinct(), sorted(), limit(), parallel(), sequential, map(), flatMap.

filter() operation

This takes Predicate functional interface as argument and the output stream of this operation will have only those elements which pass the conditional check of Predicate. You can learn a nice explanation on Predicates here.

// all the males
List<Person> allMales = persons.stream().filter(p -> p.getGender() == Gender.MALE).collect(Collectors.toList());
System.out.println(allMales);

map() operation

It is a mapper operation. It expects Function functional interface as argument. Purpose of Function is to transform from one type to other (The other type could be same).

// first names of all the persons
List<String> firstNames = persons.stream().map(Person::getFirstName).collect(Collectors.toList());
System.out.println(firstNames);

distinct()

It returns the unique elements and uses equals() under the hood to remove duplicates.

List<String> uniqueFirstNames = persons.stream().map(Person::getFirstName).distinct().collect(Collectors.toList());

System.out.println(uniqueFirstNames);

sorted()

Sorts the stream elements. It is stateful operation.

List<Person> sortedByAge = persons.stream().sorted(Comparator.comparingInt(Person::getAge)).collect(Collectors.toList());
System.out.println(sortedByAge);

limit() will reduce the number of records. It is helpful to end infinite streams in a finite manner.

Intemediate operations can be divided to two parts stateless and stateful. Most of the streams intermediate operations are stateless e.g. map, filter, limit etc. but some of them are stateful e.g. distinct and sorted because they have to maintain the state of previously visited element.

Terminal/ Reduction operations

There are many terminal operations such as forEach(), reduction(), max(), min(), average(), collect(), findAny, findFirst(), allMatch(), noneMatch().

forEach()

This takes Consumer functional interface as parameter and pass on the element for consumption.

persons.stream().forEach(System.out::println);

max(), min(), average() operations

average() returns OptionalDouble whereas max() and min() return OptionalInt.

//average age of all persons
persons.stream().mapToInt(Person::getAge).average().ifPresent(System.out::println);

// max age from all persons
persons.stream().mapToInt(Person::getAge).max().ifPresent(System.out::println);

// min age from all persons
persons.stream().mapToInt(Person::getAge).min().ifPresent(System.out::println);

noneMatch(), allMatch(), anyMatch()

matches if certain condition satisfies by none, all and/or any elements of stream respectively.

//age of all females in the group is less than 22
persons.stream().filter(p -> p.getGender() == Gender.FEMALE).allMatch(p -> p.getAge() < 22);
//not a single male's age is greater than 30
persons.stream().filter(p -> p.getGender() == Gender.MALE).noneMatch(p -> p.getAge() > 30);

persons.stream().anyMatch(p -> p.getAge() > 45);

Reduction operations

Reduction operations are those which provide single value as result. We have seen in previous snippet some of the reduction operation which do this. E.g. max(), min(), average(), sum() etc. Apart from this, Java 8 provides two more general purpose operations reduce() and collect().

reduce()

int sumOfFirst10 = IntStream.range(1, 10).reduce(0, Integer::sum);
System.out.println(sumOfFirst10);

collect()

It is a mutating reduction. Collectors has many useful collection methods like toList(), groupingBy(),

Collection<Person> persons = StreamSamples.getPersons();
List firstNameOfPersons = persons.stream().map(Person::getFirstName).collect(Collectors.toList());
System.out.println(firstNameOfPersons);

Map<Integer, List<Person>> personByAge = persons.stream().collect(Collectors.groupingBy(Person::getAge));
System.out.println(personByAge);

Double averageAge = persons.stream().collect(Collectors.averagingInt(Person::getAge));
System.out.println(averageAge);

Long totalPersons = persons.stream().collect(Collectors.counting());
System.out.println(totalPersons);

IntSummaryStatistics personsAgeSummary = persons.stream().collect(Collectors.summarizingInt(Person::getAge));

System.out.println(personsAgeSummary);

String allPersonsFirstName = persons.stream().collect(Collectors.mapping(Person::getFirstName, Collectors.joining("#")));
System.out.println(allPersonsFirstName);

The result would look like this.

[Gaurav, Gaurav, Sandeep, Rami, Jiya, Rajesh, Rampal, Nisha, Neha, Ramesh, Parul, Sunil, Prekha, Neeraj]
{32=[Person [firstName=Rami, lastName=Aggarwal, gender=FEMALE, age=32, salary=12000]], 35=[Person [firstName=Rampal, lastName=Yadav, gender=MALE, age=35, salary=12000]], 20=[Person [firstName=Prekha, lastName=Verma, gender=FEMALE, age=20, salary=3600]], 21=[Person [firstName=Neha, lastName=Kapoor, gender=FEMALE, age=21, salary=5500]], 22=[Person [firstName=Jiya, lastName=Khan, gender=FEMALE, age=22, salary=4500], Person [firstName=Ramesh, lastName=Chander, gender=MALE, age=22, salary=2500]], 24=[Person [firstName=Sandeep, lastName=Shukla, gender=MALE, age=24, salary=5000]], 25=[Person [firstName=Parul, lastName=Mehta, gender=FEMALE, age=25, salary=8500], Person [firstName=Neeraj, lastName=Shah, gender=MALE, age=25, salary=33000]], 26=[Person [firstName=Nisha, lastName=Sharma, gender=FEMALE, age=26, salary=10000]], 27=[Person [firstName=Sunil, lastName=Kumar, gender=MALE, age=27, salary=6875]], 28=[Person [firstName=Gaurav, lastName=Mazra, gender=MALE, age=28, salary=10000], Person [firstName=Gaurav, lastName=Mazra, gender=MALE, age=28, salary=10000]], 45=[Person [firstName=Rajesh, lastName=Kumar, gender=MALE, age=45, salary=55000]]}
27.142857142857142
14
IntSummaryStatistics{count=14, sum=380, min=20, average=27.142857, max=45}
Gaurav#Gaurav#Sandeep#Rami#Jiya#Rajesh#Rampal#Nisha#Neha#Ramesh#Parul#Sunil#Prekha#Neeraj

You can't consume same Streams twice

When the terminal operation is completed on stream, it is considered consumed and you can't use it again. You will end up with exception if you try to start new operations on already consumed stream.

Stream<String> stream = lines.stream();
stream.reduce((a, b) -> a.length() > b.length() ? a : b).ifPresent(System.out::println);

// below line will throw the exception
stream.forEach(System.out::println);
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
 at java.util.stream.AbstractPipeline.sourceStageSpliterator(AbstractPipeline.java:279)
 at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
 at com.gauravbytes.java8.stream.StreamExceptionExample.main(StreamExceptionExample.java:18)

Parallelism

Streams provide a convenient way to execute operations in parallel. It uses ForkJoinPool under the hood to run stream operations in parallel. You can use parallelStream() or parallel() on already created stream to perform task parallelly. One thing to note parallelism is not automatically faster than running task in serial unless you have enough data and processor cores.

persons.parallelStream().filter(p -> p.getAge() > 30).collect(Collectors.toList());
Pass java.util.concurrent.ForkJoinPool.common.parallelism property while JVM startup to increase parallelism in fork-join pool.

Concurrent reductions

ConcurrentMap<Integer, List<Person>> personByAgeConcurrent = persons.stream().collect(Collectors.groupingByConcurrent(Person::getAge));
System.out.println(personByAgeConcurrent);
Prevent interference, side-effects and stateful lambda/functions.

Side effects

If the function is doing more than consuming and/ or returning value, like modifying the state is said to have side-effects. A common example of side-effect is forEach(), mutable reduction using collect(). Java handles side-effects in collect() in thread-safe manner.

Interference

You should avoid interference in your lambdas/ functions. It occurs when you modify the underlying collection while running pipeline operations.

Stateful Lambda expressions

A lambda expression is stateful if its result depends on any state which can alter/ change during execution. Avoid using stateful lambdas expressions. You can read more here.

I hope you find this post informative and helpful. You can find the example code for reduction, aggregate operation and stream creation on Github.

1 comment :

  1. What is the difference between,
    a collection.forEach and a collection.stream.forEach. I think in your example collection.stream.forEach is bit redundant, you could directly use collection.forEach instead.

    ReplyDelete