Showing posts with label Java 8 Streams. Show all posts

In this post, we will cover following topics.

  • What are Streams?
  • What is a pipeline?
  • Key points to remember for Streams.
  • How to create Streams?

What are Streams?

Java 8 introduced new package java.util.stream which contains classes to perform SQL-like operations on elements. Stream is a sequence of elements on which you can perform aggregate operations (reduction, filtering, mapping, average, min, max etc.). It is not a data structure that stores elements like collection but carries values often lazily computed from source through pipeline.

What is a pipeline?

A pipeline is sequence of aggregate (reduction and terminal) operations on the source. It has following components.

  • A source: Collections, Generator Function, array, I/O channel etc.
  • zero or more intermediate operations: filter, map, sequential, sorted, distinct, limit, flatMap, parallel etc. Intermediate operations returns/produces stream.
  • a termination operation: forEach, reduction, noneMatch, allMatch, count, findFirst, findAny, min, max etc.

Key points to remember for Streams

  • No storage.
  • Functional in nature.
  • Laziness-seeking.
  • Possibly unbounded. Operations, for example, limit(n) or findFirst() can permit calculations on infinite streams to finish in finite time.
  • Consumable. The elements can be visited only once. To revisit, you need to create a new stream.

How to create Streams?

1. In Collection, you can create streams by calling stream(), parallelStream().

Collection<Person> persons = StreamSamples.getPersons();
persons.stream().forEach(System.out::println);

// parallel stream
persons.parallelStream().forEach(System.out::println);

2. From Stream interface, calling static factory method of() which takes varargs of T type.

Stream.of("This", "is", "how", "you", "create", "stream", "from", "static", "factory",
      "method").map(s -> s.concat(" ")).forEach(System.out::print);

3. From Arrays class, by calling stream() static method.

Arrays.stream(new String[] { "This", "is", "how", "you", "create", "stream", ".",
      "Above", "function", "use", "this" }).map(s -> s.concat(" "))
      .forEach(System.out::print);

4. From Stream by calling iterate(). It is infinite stream function.

// iterate return infinite stream... beware of infinite streams
Stream.iterate(1, i -> i++).limit(10).forEach(System.out::print);

5. From IntStream by calling range.

int sumOfFirst10PositiveNumbers = IntStream.range(1, 10).reduce(0, Integer::sum);
System.out.println(sumOfFirst10PositiveNumbers);

6. From Random by calling ints(). It is infinite stream function.

// random.ints for random number
new Random().ints().limit(20).forEach(System.out::println);

7. From BufferedReader by calling lines(). Streams of file paths can be obtained by calling createDirectoryStream of Files class and some other classes like JarFile.stream(), BitSet.stream() etc.

try (BufferedReader br = new BufferedReader(new StringReader(myValue))) {
  br.lines().forEach(System.out::print);
  System.out.println();
}
catch (IOException io) {
  System.err.println("Got this:>>>> " + io);
}

I hope the post is informative and helpful in understanding Streams. You can find the full example code on Github.

You can also read on Aggregate opeations on Stream.

This post is in continuation with my earlier posts on Streams. In this post we will discuss about aggregate operations on Streams.

Aggregate operations on Streams

You can perform intermediate and terminal operations on Streams. Intermediate operations result in a new stream and are lazily evaluated and will start when terminal operation is called.

persons.stream().filter(p -> p.getGender() == Gender.MALE).forEach(System.out::println);

In the snippet above, filter() doesn't start filtering immediately but create a new stream. It will only start when terminal operation is called and in above case when forEach().

Intermediate operations

There are many intermediate operations that you can perform on Streams. Some of them are filter(), distinct(), sorted(), limit(), parallel(), sequential, map(), flatMap.

filter() operation

This takes Predicate functional interface as argument and the output stream of this operation will have only those elements which pass the conditional check of Predicate. You can learn a nice explanation on Predicates here.

// all the males
List<Person> allMales = persons.stream().filter(p -> p.getGender() == Gender.MALE).collect(Collectors.toList());
System.out.println(allMales);

map() operation

It is a mapper operation. It expects Function functional interface as argument. Purpose of Function is to transform from one type to other (The other type could be same).

// first names of all the persons
List<String> firstNames = persons.stream().map(Person::getFirstName).collect(Collectors.toList());
System.out.println(firstNames);

distinct()

It returns the unique elements and uses equals() under the hood to remove duplicates.

List<String> uniqueFirstNames = persons.stream().map(Person::getFirstName).distinct().collect(Collectors.toList());

System.out.println(uniqueFirstNames);

sorted()

Sorts the stream elements. It is stateful operation.

List<Person> sortedByAge = persons.stream().sorted(Comparator.comparingInt(Person::getAge)).collect(Collectors.toList());
System.out.println(sortedByAge);

limit() will reduce the number of records. It is helpful to end infinite streams in a finite manner.

Intemediate operations can be divided to two parts stateless and stateful. Most of the streams intermediate operations are stateless e.g. map, filter, limit etc. but some of them are stateful e.g. distinct and sorted because they have to maintain the state of previously visited element.

Terminal/ Reduction operations

There are many terminal operations such as forEach(), reduction(), max(), min(), average(), collect(), findAny, findFirst(), allMatch(), noneMatch().

forEach()

This takes Consumer functional interface as parameter and pass on the element for consumption.

persons.stream().forEach(System.out::println);

max(), min(), average() operations

average() returns OptionalDouble whereas max() and min() return OptionalInt.

//average age of all persons
persons.stream().mapToInt(Person::getAge).average().ifPresent(System.out::println);

// max age from all persons
persons.stream().mapToInt(Person::getAge).max().ifPresent(System.out::println);

// min age from all persons
persons.stream().mapToInt(Person::getAge).min().ifPresent(System.out::println);

noneMatch(), allMatch(), anyMatch()

matches if certain condition satisfies by none, all and/or any elements of stream respectively.

//age of all females in the group is less than 22
persons.stream().filter(p -> p.getGender() == Gender.FEMALE).allMatch(p -> p.getAge() < 22);
//not a single male's age is greater than 30
persons.stream().filter(p -> p.getGender() == Gender.MALE).noneMatch(p -> p.getAge() > 30);

persons.stream().anyMatch(p -> p.getAge() > 45);

Reduction operations

Reduction operations are those which provide single value as result. We have seen in previous snippet some of the reduction operation which do this. E.g. max(), min(), average(), sum() etc. Apart from this, Java 8 provides two more general purpose operations reduce() and collect().

reduce()

int sumOfFirst10 = IntStream.range(1, 10).reduce(0, Integer::sum);
System.out.println(sumOfFirst10);

collect()

It is a mutating reduction. Collectors has many useful collection methods like toList(), groupingBy(),

Collection<Person> persons = StreamSamples.getPersons();
List firstNameOfPersons = persons.stream().map(Person::getFirstName).collect(Collectors.toList());
System.out.println(firstNameOfPersons);

Map<Integer, List<Person>> personByAge = persons.stream().collect(Collectors.groupingBy(Person::getAge));
System.out.println(personByAge);

Double averageAge = persons.stream().collect(Collectors.averagingInt(Person::getAge));
System.out.println(averageAge);

Long totalPersons = persons.stream().collect(Collectors.counting());
System.out.println(totalPersons);

IntSummaryStatistics personsAgeSummary = persons.stream().collect(Collectors.summarizingInt(Person::getAge));

System.out.println(personsAgeSummary);

String allPersonsFirstName = persons.stream().collect(Collectors.mapping(Person::getFirstName, Collectors.joining("#")));
System.out.println(allPersonsFirstName);

The result would look like this.

[Gaurav, Gaurav, Sandeep, Rami, Jiya, Rajesh, Rampal, Nisha, Neha, Ramesh, Parul, Sunil, Prekha, Neeraj]
{32=[Person [firstName=Rami, lastName=Aggarwal, gender=FEMALE, age=32, salary=12000]], 35=[Person [firstName=Rampal, lastName=Yadav, gender=MALE, age=35, salary=12000]], 20=[Person [firstName=Prekha, lastName=Verma, gender=FEMALE, age=20, salary=3600]], 21=[Person [firstName=Neha, lastName=Kapoor, gender=FEMALE, age=21, salary=5500]], 22=[Person [firstName=Jiya, lastName=Khan, gender=FEMALE, age=22, salary=4500], Person [firstName=Ramesh, lastName=Chander, gender=MALE, age=22, salary=2500]], 24=[Person [firstName=Sandeep, lastName=Shukla, gender=MALE, age=24, salary=5000]], 25=[Person [firstName=Parul, lastName=Mehta, gender=FEMALE, age=25, salary=8500], Person [firstName=Neeraj, lastName=Shah, gender=MALE, age=25, salary=33000]], 26=[Person [firstName=Nisha, lastName=Sharma, gender=FEMALE, age=26, salary=10000]], 27=[Person [firstName=Sunil, lastName=Kumar, gender=MALE, age=27, salary=6875]], 28=[Person [firstName=Gaurav, lastName=Mazra, gender=MALE, age=28, salary=10000], Person [firstName=Gaurav, lastName=Mazra, gender=MALE, age=28, salary=10000]], 45=[Person [firstName=Rajesh, lastName=Kumar, gender=MALE, age=45, salary=55000]]}
27.142857142857142
14
IntSummaryStatistics{count=14, sum=380, min=20, average=27.142857, max=45}
Gaurav#Gaurav#Sandeep#Rami#Jiya#Rajesh#Rampal#Nisha#Neha#Ramesh#Parul#Sunil#Prekha#Neeraj

You can't consume same Streams twice

When the terminal operation is completed on stream, it is considered consumed and you can't use it again. You will end up with exception if you try to start new operations on already consumed stream.

Stream<String> stream = lines.stream();
stream.reduce((a, b) -> a.length() > b.length() ? a : b).ifPresent(System.out::println);

// below line will throw the exception
stream.forEach(System.out::println);
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
 at java.util.stream.AbstractPipeline.sourceStageSpliterator(AbstractPipeline.java:279)
 at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
 at com.gauravbytes.java8.stream.StreamExceptionExample.main(StreamExceptionExample.java:18)

Parallelism

Streams provide a convenient way to execute operations in parallel. It uses ForkJoinPool under the hood to run stream operations in parallel. You can use parallelStream() or parallel() on already created stream to perform task parallelly. One thing to note parallelism is not automatically faster than running task in serial unless you have enough data and processor cores.

persons.parallelStream().filter(p -> p.getAge() > 30).collect(Collectors.toList());
Pass java.util.concurrent.ForkJoinPool.common.parallelism property while JVM startup to increase parallelism in fork-join pool.

Concurrent reductions

ConcurrentMap<Integer, List<Person>> personByAgeConcurrent = persons.stream().collect(Collectors.groupingByConcurrent(Person::getAge));
System.out.println(personByAgeConcurrent);
Prevent interference, side-effects and stateful lambda/functions.

Side effects

If the function is doing more than consuming and/ or returning value, like modifying the state is said to have side-effects. A common example of side-effect is forEach(), mutable reduction using collect(). Java handles side-effects in collect() in thread-safe manner.

Interference

You should avoid interference in your lambdas/ functions. It occurs when you modify the underlying collection while running pipeline operations.

Stateful Lambda expressions

A lambda expression is stateful if its result depends on any state which can alter/ change during execution. Avoid using stateful lambdas expressions. You can read more here.

I hope you find this post informative and helpful. You can find the example code for reduction, aggregate operation and stream creation on Github.