Saturday 10 September 2016

Understanding Java 8 Stream API

Background

Java 8 has introduced a new set of APIs involving streams. They look very powerful in term of processing and also uses functional programming we have seen in last couple of posts (Refer links in Related Links section at the bottom of this post). In this post we will essentially see what these streams are and how can we leverage it.

Streams in Java are essentially sequence of data which you can operate upon together it's called a pipeline. A stream pipeline is essentially comprising of 3 parts -

  1. Source : Think of it as data set that is used to generate a stream. Depending on data set a stream can be finite or infinite.
  2. Intermediate operations : Intermediate operations are operations that you perform on the given data set to filter or process your data. You can have as many intermediate operations as you desire. These intermediate operations give you the processed stream so that you can perform more intermediate operation on them. Since streams use lazy evaluation, the
    intermediate operations do not run until the terminal operation runs.
  3. Terminal operation :  This actually produces a result. There can be only one terminal operation. As stream can be used only once it will be invalid post terminal operation.





NOTE : Intermediate operations return a new stream. They are always lazy; executing an intermediate operation such as filter() does not actually perform any filtering, but instead creates a new stream that, when traversed, contains the elements of the initial stream that match the given predicate. Traversal of the pipeline source does not begin until the terminal operation of the pipeline is executed.

Intermediate vrs terminal operations



Creating a Stream

You can create Streams in one of the following ways -

        Stream<String> emptyStream = Stream.empty();
        Stream<Integer> singleElementStream = Stream.of(1);
        Stream<Integer> streamFromArray = Stream.of(1,2,3,4);
        List<String> listForStream = Arrays.asList("ABC","PQR","XYZ");
        Stream<String> streamFromList = listForStream.stream();
        Stream<Double> randomInfiniteStream = Stream.generate(Math::random);
        Stream<Integer> sequencedInfiniteStream = Stream.iterate(1, n -> n+1);



Line 1 creates an empty stream. Line 2 creates a stream having one element. Line 3 creates a stream containing multiple elements. Line 5 creates a stream out of a existing List. Line 6 and 7 are generating infinite Streams. Line 6 takes a supplier as argument to generate the sequence whereas Line 7 takes a Seed data integer (something to start with) and an Unary Operator used to generate the sequence.

If you try to print out infinite sequence you program will hang until you terminate it. You can try -

sequencedInfiniteStream.forEach(System.out::println);

Terminal and intermediate Stream operations

We will not get in details of each terminal and intermediate stream operations. Instead I will list them out and then see example for it. 

Common terminal operations
  1. allMatch()/anyMatch()/noneMatch()
  2. collect()
  3. count()
  4. findAny()/findFirst()
  5. forEach()
  6. min()/max()
  7. reduce()
Common intermediate operations
  1. filter()
  2. distinct()
  3. limit() and skip()
  4. map()
  5. sorted()
  6. peek()

NOTE : Notice how min(),max(), findFirst() and findAny() return Optional values.

Now lets start with how to print a Steams content because that's what we do when we are in doubt.

You can print a Stream is one of the following ways -

        List<String> listForStream = Arrays.asList("ABC","PQR","XYZ");
        Stream<String> streamFromList = listForStream.stream();
        //printing using forEach terminal operation
        streamFromList.forEach(System.out::println);
        //recreate stream as stream once operated on is invalid
        streamFromList = listForStream.stream();
        //printing using peek intermediate operation
        streamFromList.peek(System.out::println).count();
        streamFromList = listForStream.stream();
        //printing using collect terminal operation
        System.out.println(streamFromList.collect(Collectors.toList()));


Line 4 used forEach terminal operation to print out the Stream. It takes a consumer as the argument which in this case  is "System.out::println". We have used method reference here because that's common but corresponding Lambda expression would be "s -> System.out.println(s)". 
Line 8 uses peek which is a intermediate operation to look at the stream elements. It also takes a consumer as the argument. Lastly in Line 11 we have used collect terminal operator to collect the results as List and then print it put. You can define your own Collectors or you can use the ones Java have provided for you. You can find these in java.util.stream.Collectors class. For example here we have used - Collectors.toList().

Note if you have an infinite Stream these print methods will hang and you will have to manually terminate the program.

Also note you cannot modify the Base data structure directly while using it in Stream. So -

        List<String> listForStream = new ArrayList<>(Arrays.asList("ABC","PQR","XYZ"));
        Stream<String> streamFromList = listForStream.stream();
        streamFromList.forEach(elm -> listForStream.remove(elm));
        System.out.println(listForStream);


will give you -

Exception in thread "main" java.util.ConcurrentModificationException
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380)
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
    at HelloWorld.main(HelloWorld.java:34)


as you are iterating on a List and modifying it simultaneously. Instead you could filter the stream -

        List<String> listForStream = Arrays.asList("ABC","PQR","XYZ");
        Stream<String> streamFromList = listForStream.stream();
        listForStream = streamFromList.filter(x -> x.contains("A")).collect(Collectors.toList());
        System.out.println(listForStream);


You will get - [ABC]

Understanding flatMap() intermediate operation

This is an interesting intermediate operation. Hence covering this separately. It's signature is as follows -
  • <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
This basically takes each element from the stream this is called on and converts each element into a separate stream. This new stream corresponding to an element in original stream may have a different element depending on how mapping function is written. Finally each stream resulting from each element of original stream is flattened to return a single stream which has elements from all resultant stream. Eg. -

    public static void main(String[] args) {
        Stream<String> stream = Stream.of("I", "Am", "Groot");
        Stream<String> flattenStream = stream.flatMap(s -> Stream.of(s.toUpperCase()));
        System.out.println(flattenStream.collect(Collectors.toList()));
    }


Output :
[I, AM, GROOT]
Explanation:
Now flatMap takes each element of  stream and converts into another stream. Something like -
"I" -> Stream.of("I")
"Am" -> Stream.of("AM")
"Groot" -> Stream.of("GROOT")
and then flattens it
-> Stream.of("I", "AM", "GROOT") and returns.
Above is just to make you understand how it works for this case. Do not take it for actual implementation. 

This way you can merge Streams or Lists. Eg. -

    public static void main(String[] args) {
        List<String> dcHeros = Arrays.asList("Superman","Batman","Flash","Constantine");
        List<String> marvelHeros = Arrays.asList("Hulk","Ironman","Thor","Captian America");
        List<String> awesomeness = Stream.of(dcHeros.stream(),marvelHeros.stream()).flatMap(s -> s).collect(Collectors.toList());
        System.out.println(awesomeness);
    }


output :
[Superman, Batman, Flash, Constantine, Hulk, Ironman, Thor, Captian America]

Examples of Streams usage

Lets see examples of common usage now -

Lets say you have list of name. You want to get all names from that list that start with A and sort it based on their name and return 3 of them.

        List<String> listForStream = Arrays.asList("Aniket", "Amit", "Ram", "John", "Anubhav", "Kate", "Aditi");
        Stream<String> streamFromList = listForStream.stream();
        streamFromList
        .filter(x -> x.startsWith("A"))
        .sorted()
        .limit(3)
        .forEach(System.out::println);



You will get :

Aditi
Amit
Aniket

Let's see what we did here. First we got the stream out of current List, then we added a filter to have only those elements in stream which start with A. Next we are calling sorted which essentially sorts the sequence of data remaining in stream. This will be natural sort based on name. Lastly we just limit 3 entries and print them.

Now guess what the following code does -

        Stream.iterate(1, n -> n+1)
        .filter(x -> x%5==0)
        .limit(5)
        .forEach(System.out::println);


And the output is -
5
10
15
20
25

Firstly we are creating an infinite Stream here using iterate. It will generate sequence 1,2,3,4,5.... so on. Next we apply filter to keep only multiples of 5. Next we limit to only 5 such results. This will reduce our infinite stream to a finite one. Lastly we print out those 5 results. Hence the result.

Now lets move on to using peek -

        Stream.iterate(1, n -> n+1)
        .filter(x -> x%5==0)
        .peek(System.out::println)
        .limit(5)
        .forEach(System.out::println);


What would above code snippet print? Answer is -
5
5
10
10
15
15
20
20
25
25

So here we are printing the details once post filter and then once after limiting. Hence the result.

NOTE : Stream never modifies the original collection unless you do change it yourself from the stream. See following example to understand -

        List<String> myList  = new ArrayList<String>();
        myList.add("a");
        myList.add("b");
        myList.add("b");
        myList.add("d");
        List<String> newMyLis = myList.stream().map(str -> str + "a").collect(Collectors.toList());
        System.out.println(myList);
        System.out.println(newMyLis);


Output of which is -
[a, b, b, d]
[aa, ba, ba, da]

Also to reiterate Stream does not really run until its terminal operation is run. It is lazy init. So something like -
  • countriesList.stream().filter(s -> s.startsWith("I"))
will just return a stream and do nothing.

Working with primitives and Stream

Similarly we have Streams for primitives as well -
Here are three types of primitive streams:
  • IntStream: Used for the primitive types int, short, byte, and char
  • LongStream: Used for the primitive type long
  • DoubleStream: Used for the primitive types double and float
They have additionally range() and rangeClosed() methods. The call range(1, 100) on IntStream and LongStream creates a stream of the primitives from 1 to 99 whereas rangeClosed(1, 100) creates a stream of the primitives from 1 to 100. The primitive streams have math operations including average(), max(), and sum(). There is one more additional method called summaryStatistics() to get many statistics in one call.

Eg.
private static int range(IntStream ints) {
    IntSummaryStatistics stats = ints.summaryStatistics();
    if (stats.getCount() == 0) throw new RuntimeException();
    return stats.getMax()—stats.getMin();
}


Also there are functional interfaces specific to streams.



Parallel Streams

Streams have inbuild support for multi threading. There are two ways you can create a parallel stream -
  1. Call parallel() on an existing stream to convert into a parallel stream (as an intermediate operation) OR
  2. You can directly call parallelStream() on your collection object to get a parallel stream.
2nd way is used more often. Now lets see the difference between the two -

Consider following example -

    public static void main(String[] args) {
       
        System.out.println("Using a Serial Stream : ");
        Arrays.asList(1,2,3,4,5).stream().forEach(System.out::println);
        System.out.println("Using a Parallel Stream : ");
        Arrays.asList(1,2,3,4,5).parallelStream().forEach(System.out::println);
           
    }


One possible output is -

Using a Serial Stream :
1
2
3
4
5
Using a Parallel Stream :
3
1
4
2
5

The reason for saying one possible output is that for parallel stream you cannot really predict the order. It's like printing each number in different runnable tasks submitted to a fixed thread pool executor service.

NOTE : Parallel streams can process results independently, although the order of the results cannot be determined ahead of time.  Also if you are using parallel stream always use concurrent collections.


NOTE : Any stream operation that is based on order, including findFirst(), limit(), or skip(), may actually perform more slowly in a parallel environment. This is a result of a parallel processing task being forced to coordinate all of its threads in a synchronized-like fashion.

Related Links

No comments:

Post a Comment

t> UA-39527780-1 back to top