《Java8实战》学习笔记(一)——关于Lambda和Stream

本来看到《Effective Java》第七章看不太懂,买了这本书,花了一星期看完觉得很好。安利一波,Manning出版社的in action系列真的好。
现在看第二遍,温故知新,并结合动手实现一些书上的代码,去写这篇学习笔记。
本篇为《Java8实战》的Chapter1-7的学习总结。

关于Java8

  1. 纵向看,现在Java14都有了,一味的追逐JDK版本没有什么意义,Java8算是里程碑的版本,也是生产力版本。
  2. 横向看,编程语言是一个生态系统,某种语言的兴衰受到很多因素的影响,比如计算机形态、硬件能力、数据形态,这些都让一代又一代程序员去进行自然选择。
  3. 面向现在,编程语言生态系统越来越多的出现大数据这种气候,那更便捷的进行数据操作,适应这种新兴的气候,就成为了一个重要的编程任务和理念,“要么改变,要么衰亡”。
  4. Java8中出现了三个新的编程概念:
    • 流(Stream)和流处理(Stream Pipeline)
    • 用行为参数化把代码传递给方法,代码就是参数的一种。
    • 并行与共享的可变数据。代码中不访问共享的可变数据,这种一般被称为纯函数、无副作用函数或无状态函数。
  5. 没有共享的可变数据和将行为和函数船体给其他方法的能力是函数式编程范式的基石。
  6. Java8最大的变化是它开始远离那种侧重改变现有值的经典面向对象的思想,而向函数式编程领域转变。两种思想看似冲突,但对编程语言来说,目的是获得两种编程范式中最好的东西。

一、Lambda

1.基本概念

Java中的函数

编程语言中的函数一般指方法,和数学函数定义类似。
对Java来说,函数成为了值的一种新形式。
Java原有的值包括:1.基本类型的原始值int、double、char等;2.对象的引用,比如new一个东西,数组也是对象的一种。这些值一般被称为一等值或者一等公民。
现在,方法和Lambda(匿名方法)可以称为值的一种,也在Java8中从二等公民上升到了一等公民。
于是,函数式编程风格一般也就是指“编写把函数作为一等值的程序”。

Java中的流

流衍生于集合。
Collection集合是外部迭代,你需要去自己去做迭代,怎么去做循环然后处理元素。
Stream是内部迭代,不要去操心迭代循环的事情。

Java中的default默认方法

Java8中添加default关键字,主要是为了支持库设计师,写出更容易改进的接口。
比如List新的stream方法,Collection API中并没有,现在想给list或者set添加一个stream方法,如果直接在Collection接口添加一个方法,然后实现的相关类都去进行实现,那代价是很大的,其他很多框架将会变的立马不能用。
于是,出现了一个进退两难的问题,如何不破坏已发布的接口而不破坏原有的实现?
答案就是接口需要包含实现类没有提供实现的方法签名,缺失的方法随接口直接实现提供,而不是由实现类进行提供,即默认实现default方法。

2.通过行为参数化传递代码

唯一不变的就是变化。用户的需求永远都是在变的。
行为参数化就是可以帮助处理频繁变更需求的一种开发设计模式。

行为参数化:将代码块作为一个参数去传递给另外一个方法。你可以先准备好一段代码块,交给另外一个方法,而这个代码块以后会被你程序的其他部分调用,此时代码块才会被执行。

例子:筛选苹果

比如,开始让你筛选绿苹果,然后让你筛选各种颜色的苹果,再让你筛选重量在某个区间的苹果,甚至苹果产地这个属性也要参与筛选,最后各种组合筛选、复杂的查询。如果每一种都去实现一个方法,会出现各种打破DRY的情况(Don't Repeat Yourself)。
下面一步一步的解决这个问题。

第一步,高层抽象来看,这其实都是根据苹果的某些属性来返回一个boolean的值。那我们可以用一个判断yes or no的谓词进行代替,像下面这样,有点像设计模式的策略模式。

    interface ApplePredicate {
        boolean test(Apple a);
    }

    static class AppleWeightPredicate implements ApplePredicate {
        public boolean test(Apple apple) {
            return apple.getWeight() > 150;
        }
    }

    static class AppleColorPredicate implements ApplePredicate {
        public boolean test(Apple apple) {
            return "green".equals(apple.getColor());
        }
    }

    static class AppleRedAndHeavyPredicate implements ApplePredicate {
        public boolean test(Apple apple) {
            return "red".equals(apple.getColor())
                    && apple.getWeight() > 150;
        }
    }

第二步,只需要一个方法承接各种策略即可,比如下面的filter方法。细想一下,filter方法和各种策略的配合即完成了“行为参数化”这一理念,让方法接受多种行为(策略)作为参数,并在内部使用,完成不同的行为。多种行为,一个参数。

    public static List<Apple> filter(List<Apple> inventory, ApplePredicate p) {
        List<Apple> result = new ArrayList<>();
        for (Apple apple : inventory) {
            if (p.test(apple)) {
                result.add(apple);
            }
        }
        return result;
    }
    
        // [Apple{color='green', weight=80}, Apple{color='green', weight=155}]
        List<Apple> greenApples2 = filter(inventory, new AppleColorPredicate());
        System.out.println(greenApples2);

        // [Apple{color='green', weight=155}]
        List<Apple> heavyApples = filter(inventory, new AppleWeightPredicate());
        System.out.println(heavyApples);

        // []
        List<Apple> redAndHeavyApples = filter(inventory, new AppleRedAndHeavyPredicate());
        System.out.println(redAndHeavyApples);

第三步,其实上面已经做到了很好的设计,一个参数就可以实现多种行为,但是使用起来不友好,需要很多类就是在用的那一下进行实例化,而且只需要那一次的实例化。我们可以用匿名类去同时声明并进行实例化这个类,随用随建

        List<Apple> redApples2 = filter(inventory, new ApplePredicate() {
            public boolean test(Apple a) {
                return a.getColor().equals("red");
            }
        });

第四步,匿名类的语法还是很啰嗦,这里只有一个抽象方法,用lambda式更为方便,代码会比以前干净整洁好多。
List greenApples2 = filter(inventory, (Apple a) -> "green".equals(a.getColor()));

第五步,用泛型抽象化可以做更多的事情,不仅用于苹果,只要是个列表就可以进行类似的过滤。

public interface Predicate<T> {
    boolean test(T t);
}

public static <T> List<T> filter(List<T> list, Predicate<T> predicate) {
        List<T> result = new ArrayList<>();
        for (T item : list) {
            if (predicate.test(item)) {
                result.add(item);
            }
        }
        return result;
    }

至此,在灵活性和间接性之间找到了最佳的平衡点。

一些真实的例子:

        List<Integer> integerList = new ArrayList<>();
        integerList.sort((a, b) -> a.compareTo(b));
        Thread t = new Thread(() -> System.out.println("aaa"));
        Button button = new Button();
        button.addActionListener((e) -> System.out.println());

3.Lambda表达式

Lambda表达式可以看作是“简洁地表示可传递的匿名函数的一种方式”。没有名称,有参数列表、函数主题和返回类型,可能还会有异常列表。
Lambda表达式由参数、箭头和主题组成。其语法可以概括为:
(parameters) -> expression 或 (parameters) -> {statements;}
注意,return属于控制流语句,不会是表达式,肯定是大花括号。表达式一般会隐含return隐含关键字。

在哪里以及如何使用Lambda

函数式接口:只定义一个抽象方法的接口(可以有很多默认方法)。Lambda表达式可以直接将整个表达式作为函数式接口的一个具体实现的实例。
Java8理念更新——“用单个抽象方法的接口是特别的,应该得到特别的对待”。

函数描述符:函数式接口的抽象方法的签名就是Lambda表达式的签名,这个抽象方法就是函数描述符,这也是类型检查的保证。例如:
()->void 代表参数列表为空,并返回void的函数,
(Apple,Apple)->int 代表接受两个Apple作为参数返回int的函数。

@FunctionalInterface 该注解表示该接口会被设计成函数式接口。

Lambda实践小例子

资源处理(文件或者数据库)的一种常见情况是:打开一个资源做一些处理然后关闭资源。除了处理的代码,其他代码总是类似的,这就是所谓的环绕执行模式。
下面这段代码是典型的环绕执行,核心代码就是其读一行文件内容这个行为。

public static String processFileLimited() throws IOException {
        try (BufferedReader br =
                     new BufferedReader(new FileReader("src/main/resources/lambdasinaction/chap3/data.txt"))) {
            return br.readLine();
        }
    }

如果是要其他行为呢?比如读前两行,读最后一行什么的。
好像又回到上面的那句话,即多种行为一个参数,这不正是“行为参数化”的典型情况。
1.首先行为参数化,抽象来看,可以用 (BufferReader)->String 这一参数描述符来概括。
2.然后,根据函数描述符定义函数式接口,比如我们定义为BufferedReaderProcessor接口。我们新建一个processFile方法承接这个函数式接口。

    public interface BufferedReaderProcessor {
        String process(BufferedReader b) throws IOException;
    }
    
    public static String processFile(BufferedReaderProcessor p) throws IOException {
        //需要实现
    }

3.当行为已经通过processFile方法作为参数传进来,现在就是需要执行这个参数所代表的行为。

    public static String processFile(BufferedReaderProcessor p) throws IOException {
        try (BufferedReader br = new BufferedReader(new FileReader("src/main/resources/lambdasinaction/chap3/data.txt"))) {
            return p.process(br);
        }

    }

4.最后就可以通过传递不同的Lambda就可以让processFile方法以不同的方式处理文件了。

String oneLine = processFile((BufferedReader b) -> b.readLine());
String twoLines = processFile((BufferedReader b) -> b.readLine() + b.readLine());

4.Java8中的函数式接口

Java8的设计师本身就根据常见的几种函数描述符设计了一套函数式接口。
Java本身就只有9种函数描述符,但是由于泛型只能绑定在引用类型,不能直接使用原始类型,Java本身利用装箱(boxing)和拆箱(unboxing)以及自动装箱机制去完成,但是本身性能代价很大。
因为装箱本身把原始类型包装起来,因此装箱需要更多的值,于是函数式接口直接对于原始类型特化了很多专门的版本,避免自动装箱。
具体函数式接口参见文章:Java8常用的函数式接口以及函数描述符

使用函数式接口遇见异常

任何函数式接口都不允许抛出checked异常。如果你需要lambda表达式抛出异常,两种方式。

  1. 自己定义一个函数式接口,并声明受检异常,像上面的BufferedReaderProcessor接口就扔出了IOException这个checked异常。
  2. 把lambda包在一个try/catch块中。
Function<BufferedReader, String> f = bufferedReader -> {
            try {
                return bufferedReader.readLine();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        };

5.类型检查和推断

就像泛型作为语法糖需要进行类型推断一样,省略了更多信息的lambda表达式当然更为需要。
Lambda表达式的类型是从使用Lambda的上下文推断出来的。上下文包括(接受它传递的方法参数,接受值的局部变量),重点是函数描述符。

同样的Lambda,在不同的上下文的情况下因为会被推断成不同的函数式接口,所以会出现下面的情况,特殊的是void兼容的问题,如果一个lambda的主题是一个语句表达式,他就和返回void的函数描述符兼容,所以下面虽然add返回是boolean值,但是放在Consumer这个接口下面,也不存在任何问题。

    @Test
    void lambda() {
        Callable<Integer> c = () -> 42;
        PrivilegedAction<Integer> a = () -> 42;

        //void兼容规则
        List<String> list = new ArrayList<>();
        Predicate<String> p = s -> list.add(s);
        Consumer<String> b = s -> list.add(s);
    }

由于参数类型可以被推断出来所以,lambda表达式中的参数类型可以被省略。

List<Apple> greenApples = filter(inventory, (Apple a) -> "green".equals(a.getColor()));
List<Apple> greenApples = filter(inventory, a -> "green".equals(a.getColor()));

6.使用局部变量

Lambda可以使用局部变量,限制是要么直接final修饰,要么事实上是最终的(代码不改变其值)
其原因是局部变量存储在栈中(单一线程持有),而对象实例放在堆上,常量、静态变量放在方法区或者持久代上,后两者都是线程之间共享的。
而Lambda本身是在一个单独的线程内使用,所以一个线程如果分配某个变量给lambda的线程以后又去赋值这个变量,就会出现不确定错误,因此直接无法编译。

7.方法引用

方法引用可以让你重复使用现有的方法进行定义,并像Lambda一样进行传递。
对象引用是使用new关键字进行创建然后进行传递,而方法引用是使用“目标::方法”进行创建。

方法引用的几种类型

  1. 静态方法的引用。(args) -> ClassName.staticMethod(args)等价于ClassName:ClassName.staticMethod。这种最好理解,例如Integer.parseInt()等价于Integer::parseInt.
  2. 目标出现在实现方法参数中。(arg0, rest) -> arg0.instanceMethod(rest)等价于ClassName.instanceMethod。例如(s1,s2)->s1.compareToIgnoreCase(s2)等价于String::compareToIgnoreCase。
  3. 目标是已经存在的外部对象。(args) -> expr.instanceMethod(args)等价于expr::instanceMethod
  4. 构造方法的引用。利用类名和new关键字进行引用,ClassName::new。记住上面提到的类型推断,不同的上下文情况下,同样的方法引用会代表不同的lambda表达式,因为它对应了不同的构造方法。
        Supplier<Apple> s1 = Apple::new;
        Supplier<Apple> s2 = () -> new Apple();
        s1.get();

        Function<String, Apple> f1 = Apple::new;
        Function<String, Apple> f2 = s -> new Apple(s);
        f1.apply("aa");

        BiFunction<Integer, String, Apple> bf1 = Apple::new;
        BiFunction<Integer, String, Apple> bf2 = ((integer, s) -> new Apple(integer, s));
        bf1.apply(1, "sss");

8.Lambda表达式的复合使用

复合即使得多个简单的Lambda表达式组合成复杂的表达式。Java中一切都是对象,所以f.apply(a)虽然数学意义上是f(a),但是面向对象还是不能让函数完全独立。

a.比较器的复合

Comparator接口的comparing方法可以用于集合的比较,还可以使用reversed进行逆序,以及thenComparing实现比较器链。

inventory.sort(comparing(Apple::getWeight)
                .reversed()
                .thenComparing(Apple::getColor));

b.谓词的复合

谓词复合主要是与或非,分别对应and,or和negate。

Predicate<Apple> p = apple -> apple.weight > 130;
Predicate<Apple> p2 = p.and(apple -> apple.getColor().equals("aa")).negate().or(apple -> apple.getColor().equals("aaa"));

c.函数的复合

函数的复合主要是f函数和g函数是f(g(x))还是g(f(x))的问题,前者是f.andThen(g),后者是f.compose(g)

二、Stream

1.基本概念

流是Java API的新成员,主要特点是以声明式的方式内部迭代区处理数据集合,而且还加入了并行处理。此外pipeline流水线的思想是的代码清晰可读。

三大特点

1.声明式——更简洁,更易读
2.可复合——更灵活
3.可并行——性能更好

流和集合

Java8中直接支持从各种集合中利用stream方法返回一个流。

集合是数据结构,主要目的是以特定的时间/空间复杂度存储和访问元素。
流的目的在于表达计算,只是会使用一个提供数据的源,可能来自于集合,也可能是数组以及输入输出流。

1.两者的区别就在于对数据和计算之间的关系的差异。前者是一个内存中的数据结构,后者则只是概念上固定的数据结构。前者是供应商驱动的,供应商来了一批货,开始去卖这批货。后者是需求驱动的,生产者-消费者的关系,消费者要买,立马生产卖掉
2.流是时间意义上分布的一组值;集合是空间中分布的一组值,在一个时间点同时存在。
3.流和迭代器一样,只能遍历一次。使用终端操作遍历完之后,我们就说这个流已经被消费掉了,可以从原始数据源获得一个新的流。
4.流使用内部迭代的好处,可以更好的进行并行处理,或者用更优化的顺序进行处理,如果使用外部迭代很难进行优化。

2.流水线pipeline

pipeline,流水线或者流管道。
Stream接口中定义了许多操作,主要可以分为中间操作和终端操作。
a.中间操作会返回一个流,可以让流连接起来
b.终端操作的结果不是流,把流给消费关闭掉,比如forEach的结果返回是void,count返回是int等。

流的三件事

1.数据源,产生流
2.中间操作链,形成流水线
3.终端操作,执行流水线,生成结果。
流的流水线背后类似于构造器设计模式,最后的build是终端操作,前面的调用链是中间操作。

3.流实战

筛选和切片

filter(Predict):筛选主要是用filter方法,接收谓词作为参数,返回一个符合谓词条件的流。
distinct():去重,筛选出元素各异的流。
limit(n):截断流,会返回一个不超过给定长度的流。
skip(n):跳过元素,返回一个扔掉了前n个元素的流,如果不足n个,返回空流。

映射

map:接受一个函数,这个函数会应用到流的每个元素上,输出映射后元素类型的流。映射的意思是对于通过的每一个元素都会根据函数创建一个新的元素放到输出流中,而不是修改这个元素。
flatMap:流的扁平化,让你把一个流中的每个值都换成另一个流,然后把所有流连接起来成为一个流。idea会对每次转换过后的结果进行提示,可以发现区别。

匹配

数据处理经常发生的情况是看看数据集中的某些元素是否匹配一个给定的属性。
anyMatch:流中是够有一个元素能匹配给定的谓词
allMatch:流中的元素是否都能匹配给定的谓词
noneMatch:流中的元素不存在和谓词匹配的元素
这三个操作都是短路操作。

查找

findAny:返回当前流中的任何一个元素,返回结果用Optional进行包装。
findFirst:返回流中的第一个元素。
两者的区别在于并行,找到第一个需要保证原始顺序,对并行性能有影响,如果不关心第一个,使用findAny。

归约

归约操作,是指将流归约成一个值,函数式编程通常称为折叠。
reduce:归约操作。第一个参数是初始值,第二个参数是具体的结合归约的操作,第一个参数不提供时由于结果可能为空,则返回是Optional包装的值。可以利用reduce实现sum,count,max,min等归约函数。

int sum2 = numbers.stream().reduce(0, Integer::sum);
int max = numbers.stream().reduce(0, (a, b) -> Integer.max(a, b));
Optional<Integer> min = numbers.stream().reduce(Integer::min);
int count = menu.stream().map(dish -> 1).reduce(Integer::sum).get();

4.数值流

三种原始数据类型特化流:IntStream,DoubleStream,LongStream
上面的map方法会重新生成Stream < T > 流,直接是无法使用sum等归约方法的,因此出现了上面三种特化的数值流。

对象流映射成数值流:mapToInt、mapToDouble、mapToLong方法去替换map方法。
数值流转换成对象流:boxed()方法。比如Stream stream = intStream.boxed()就进行装箱转换了。
mapToObj:返会指定的对象流
区分空流和默认值:OptionalInt、OptionalDouble和OptionalLong

        int calories = menu.stream()
                .mapToInt(Dish::getCalories)
                .sum();

        // max and OptionalInt
        OptionalInt maxCalories = menu.stream()
                .mapToInt(Dish::getCalories)
                .max();
                

数值范围

数值流中可以产生数值范围。在IntStream和LongStream中有range和rangeClosed这两个静态方法,前者不包含结束值,后者closed是包含结束值的。

    IntStream evenNumbers = IntStream.rangeClosed(1, 100)
                .filter(n -> n % 2 == 0);
          
    //勾股数打印      
    Stream<int[]> pythagoreanTriples = IntStream.rangeClosed(1, 100).boxed()
            .flatMap(a -> IntStream.rangeClosed(a, 100)
            .filter(b -> Math.sqrt(a * a + b * b) % 1 == 0).boxed()
            .map(b -> new int[]{a, b, (int) Math.sqrt(a * a + b * b)}));

    pythagoreanTriples.forEach(t -> System.out.println(t[0] + ", " + t[1] + ", " + t[2]));
    
    Stream<double[]> pythagoreanTriples2 = IntStream.rangeClosed(1, 100)
            .boxed()
            .flatMap(a -> IntStream.rangeClosed(a, 100)
                    .mapToObj(b -> new double[]{a, b, Math.sqrt(a * a + b * b)})
                    .filter(n -> n[2] % 2 == 0));

5.构建流

  1. 由值构建流,Stream.of
  2. 由数组构建流,Arrays.stream(numbers)
  3. 由文件生成流,java NIO的相关方法。
  4. 由函数生成流,iterate函数接受两个参数,初始值和UnaryOperator类型的表达式,依次使用表达式生成;generate函数接受Supplier表达式提供新的值。
        // Stream.of
        Stream<String> stream = Stream.of("Java 8", "Lambdas", "In", "Action");
        stream.map(String::toUpperCase).forEach(System.out::println);

        // Stream.empty
        Stream<String> emptyStream = Stream.empty();

        // Arrays.stream
        int[] numbers = {2, 3, 5, 7, 11, 13};
        System.out.println(Arrays.stream(numbers).sum());
        
        //NIO API
        long uniqueWords = Files.lines(Paths.get("lambdasinaction/chap5/data.txt"), Charset.defaultCharset())
                                 .flatMap(line -> Arrays.stream(line.split(" ")))
                                 .distinct()
                                 .count();
        // fibonnaci with iterate     
        Stream.iterate(new int[]{0, 1}, t -> new int[]{t[1],t[0] + t[1]})
              .limit(10)
              . map(t -> t[0])
              .forEach(System.out::println);

        // random stream of doubles with Stream.generate
        Stream.generate(Math::random)
              .limit(10)
              .forEach(System.out::println);

6.用流收集数据——collect操作

collect也是一个归约的终端操作。
传递给collect的参数必须是Collector的接口的实现。
Collectors实用类提供了很多静态工厂方法,方便的创建常见收集器的实例。

7.Collectors中的静态方法

工厂方法 返回类型 用于
toList List<T>  把流中所有项目收集到一个 List
使用示例: List<Dish> dishes = menuStream.collect(toList());
toSet Set<T> 把流中所有项目收集到一个 Set ,删除重复项
使用示例: Set<Dish> dishes = menuStream.collect(toSet());
toCollection Collection<T> 把流中所有项目收集到给定的供应源创建的集合
使用示例: Collection<Dish> dishes = menuStream.collect(toCollection(ArrayList::new));
counting Long 计算流中元素的个数
使用示例: long howManyDishes = menuStream.collect(counting());
summingInt Integer 对流中项目的一个整数属性求和
使用示例: int totalCalories = menuStream.collect(summingInt(Dish::getCalories));
averagingInt Double 计算流中项目 Integer 属性的平均值
使用示例: double avgCalories = menuStream.collect(averagingInt(Dish::getCalories));
summarizingInt IntSummaryStatistics 收集关于流中项目 Integer 属性的统计值,例如最大、最小、总和与平均值
使用示例: IntSummaryStatistics menuStatistics = menuStream.collect(summarizingInt(Dish::getCalories));
joining String 连接对流中每个项目调用 toString 方法所生成的字符串,有个分隔符的重载版本
使用示例: String shortMenu = menuStream.map(Dish::getName).collect(joining(","));
maxBy Optional<T> 一个包裹了流中按照给定比较器选出的最大元素的 Optional,或如果流为空则为 Optional.empty()
使用示例: Optional<Dish> fattest = menuStream.collect(maxBy(comparingInt(Dish::getCalories)));
minBy Optional<T> 一个包裹了流中按照给定比较器选出的最小元素的 Optional,或如果流为空则为 Optional.empty()
使用示例: Optional<Dish> lightest = menuStream.collect(minBy(comparingInt(Dish::getCalories)));
reducing 归约操作产生的类型 从一个作为累加器的初始值开始,利用 BinaryOperator 与流中的元素逐个结合,从而将流归约为单个值
使用示例: int totalCalories = menuStream.collect(reducing(0, Dish::getCalories, Integer::sum));
collectingAndThen 转换函数返回的类型 包裹另一个收集器,对其结果应用转换函数
使用示例: int howManyDishes = menuStream.collect(collectingAndThen(toList(), List::size));
groupingBy Map<K, List<T>> 根据项目的一个属性的值对流中的项目作分组,并将属性值作为结果 Map 的键
使用示例: Map<Dish.Type,List<Dish>> dishesByType = menuStream.collect(groupingBy(Dish::getType));
partitioningBy Map<Boolean,List<T>> 根据对流中每个项目应用谓词的结果来对项目进行分区
使用示例: Map<Boolean,List<Dish>> vegetarianDishes = menuStream.collect(partitioningBy(Dish::isVegetarian));

8.Collector接口实现

目标:实现一个收集质数的接口
先理解Collector接口的五个方法,直接用代码来看:

    public static class PrimeNumbersCollector
            implements Collector<Integer, Map<Boolean, List<Integer>>, Map<Boolean, List<Integer>>> {

        /**
         * @return 创建一个空的容器,质数与合数分别用false或者true来识别
         */
        @Override
        public Supplier<Map<Boolean, List<Integer>>> supplier() {
            return () -> new HashMap<Boolean, List<Integer>>() {{
                put(true, new ArrayList<>());
                put(false, new ArrayList<>());
            }};
        }

        /**
         * @return 对一个数是否是质数进行判断,然后将结果添加到结果容器对应的列表中
         */
        @Override
        public BiConsumer<Map<Boolean, List<Integer>>, Integer> accumulator() {
            return (Map<Boolean, List<Integer>> acc, Integer candidate) -> {
                acc.get(isPrime(acc.get(true),
                        candidate))
                        .add(candidate);
            };
        }

        /**
         * @return 合并并行处理的各个结果容器
         */
        @Override
        public BinaryOperator<Map<Boolean, List<Integer>>> combiner() {
            return (Map<Boolean, List<Integer>> map1, Map<Boolean, List<Integer>> map2) -> {
                map1.get(true).addAll(map2.get(true));
                map1.get(false).addAll(map2.get(false));
                return map1;
            };
        }

        @Override
        public Function<Map<Boolean, List<Integer>>, Map<Boolean, List<Integer>>> finisher() {
            return Function.identity();
        }

        /**
         * @return 定义收集器的行为,通常是UNORDERED, CONCURRENT, IDENTITY_FINISH这三个项目的枚举,
         * 第一个是当认为数据源是无序的,比如Set,就可以添加这个特性,否则不应该添加该枚举值。因为该特性不承诺保存的顺序和元素出现的顺序一致。
         * 第二个是默认多线程(前提是得到并行流)可以并行调用accumulator函数并且最终能返回正确的结果。
         * 多线程(前提是得到并行流)其实操作的是同一个结果容器,这个时候就需要开发者自己保证多线程操作同一个结果容器的准确性。
         * 此时combiner不会执行,因为操作的是同一个结果容器。
         * 当Set<Characteristics>中不包含CONCURRENT时, 并行流在调用collect方法时操作的是多个不同的结果容器,并且一定会执行combiner方法返回的函数式接口实例。
         * 第三个是中间结果和返回结果一致时才可以调用,出现时finish函数不会执行;
         * Set中包含了IDENTITY_FINISH枚举值时,我们自己就需要保证中间结果类型就是最终的结果类型,否则就会强制类型转化失败,代码会抛出ClassCastException异常。
         */
        @Override
        public Set<Characteristics> characteristics() {
            return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH));
        }
    }

9.并行流

并行流指的是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。

parallel:将顺序流转换为并行流
sequential:将并行流转换为顺序流
两者可以混合使用,但是整个流水线看起来是什么样是由最后一次调用决定的。

并行流内部使用ForkJoinPool去实现,默认的线程数量是处理器数量。

高效使用并行流

  1. 顺序流转换成并行流轻而易举,但并不一定是好事,即不一定快,也不一定符合预期,一定要多测量保证不出问题。
  2. 装箱操作非常慢,尽量用特化的原始类型流进行操作。
  3. 有些操作本来就不适合并行,比如limit操作和findFirst操作
  4. 一个元素通过流水线的时间越长,使用并行流性能好的可能性越大。
  5. 数据量较小,就别用并行流,因为这还赶不上并行化找到的代价。
  6. 要考虑流背后的数据结构是不是利用分解。比如LinkedList需要遍历才能分解成数据块,就不合适并行。还有Stream.iterate产生的流也是不利于分解。
  7. combiner操作如果代价很大,也不利于并行。
  8. 中间操作可能会影响流的性能要考虑,比如filter之后数据量很小就不要并行。

10.分支/合并框架

分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是ExecutorService接口的一个实现,它把子任务分配给线程池ForkJoinPool的工作线程。

要把任务提交到线程池,必须创建RecursiveTask < R >的一个子类,R是最后产生的结果类型。创建这个类只要实现compute这个抽象方法。下面是求和的实现。

@Override
    protected Long compute() {
        int length = end - start;
        //如果划分出的区块长度小于阈值,就直接计算
        if (length <= THRESHOLD) {
            return computeSequentially();
        }
        //创建一个子任务为数组的前一半求和
        ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
        //fork操作,异步执行新的子任务
        leftTask.fork();
        //创建另一个子任务为数据的后一半求和
        ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);
        //同步执行第二个子任务,有可能会进一步被划分
        Long rightResult = rightTask.compute();
        //读取第一个子任务的结果,如果尚未完成就等待
        Long leftResult = leftTask.join();
        //两个子任务的结果组合
        return leftResult + rightResult;
    }

高效使用分支/合并框架

  1. 对一个任务调用join方法会阻塞调用方,直到该任务做出结果,因此有必要在两个子任务的计算都开始之后再调用它,不然另一个子任务要等要到这个join完成才开始执行会很慢。
  2. 不要在RecursiveTask内部使用invoke方法,应该直接使用fork或者compute,invoke实现使用的join顺序执行的。
  3. 你当然可以两个都fork,然后都join,但是这通常没有一个fork,一个compute的效率高,因为这样做可以为其中一个子任务重用同一个呈现,避免线程池分配线程的开销。
  4. 调试比较麻烦,要积累经验。
  5. 和并行流一样,这个框架也不是天生的快,框架的相关代码需要执行几遍预热过好才会被JIT优化,对比时要注意。
  6. 分支/合并的策略还有一个关键就是上面的阈值,什么时候顺序求值会比较合理。
  7. 由于工作窃取(work stealing)机制的存在,即完成任务的线程会从未完成任务的线程中任务队列偷来任务去执行,这一机制也决定了分出大量的小任务一般来说都是一个好的选择。

11.Spliterator

Spliterator时Java8新的一个接口,名字代表可分的迭代器(Splitable iterator)。这是专门为了并行执行而设计的。这个真的复杂,太容易出错,先略吧,等到必须得用的时候再好好研究一下吧。

import java.util.*;
import java.util.function.*;
import java.util.stream.*;

public class WordCount {

    public static final String SENTENCE =
            " Nel   mezzo del cammin  di nostra  vita " +
            "mi  ritrovai in una  selva oscura" +
            " che la  dritta via era   smarrita ";

    public static void main(String[] args) {
        System.out.println("Found " + countWordsIteratively(SENTENCE) + " words");
        System.out.println("Found " + countWords(SENTENCE) + " words");
    }

    public static int countWordsIteratively(String s) {
        int counter = 0;
        boolean lastSpace = true;
        for (char c : s.toCharArray()) {
            if (Character.isWhitespace(c)) {
                lastSpace = true;
            } else {
                if (lastSpace) counter++;
                lastSpace = Character.isWhitespace(c);
            }
        }
        return counter;
    }

    public static int countWords(String s) {
        //Stream<Character> stream = IntStream.range(0, s.length())
        //                                    .mapToObj(SENTENCE::charAt).parallel();
        Spliterator<Character> spliterator = new WordCounterSpliterator(s);
        Stream<Character> stream = StreamSupport.stream(spliterator, true);

        return countWords(stream);
    }

    private static int countWords(Stream<Character> stream) {
        WordCounter wordCounter = stream.reduce(new WordCounter(0, true),
                                                WordCounter::accumulate,
                                                WordCounter::combine);
        return wordCounter.getCounter();
    }

    private static class WordCounter {
        private final int counter;
        private final boolean lastSpace;

        public WordCounter(int counter, boolean lastSpace) {
            this.counter = counter;
            this.lastSpace = lastSpace;
        }

        public WordCounter accumulate(Character c) {
            if (Character.isWhitespace(c)) {
                return lastSpace ? this : new WordCounter(counter, true);
            } else {
                return lastSpace ? new WordCounter(counter+1, false) : this;
            }
        }

        public WordCounter combine(WordCounter wordCounter) {
            return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace);
        }

        public int getCounter() {
            return counter;
        }
    }

    private static class WordCounterSpliterator implements Spliterator<Character> {

        private final String string;
        private int currentChar = 0;

        private WordCounterSpliterator(String string) {
            this.string = string;
        }

        /**
         * @param action
         * @return
         *
         * 接收 WordCounter::accumulate 函数处理当前字符
         */
        @Override
        public boolean tryAdvance(Consumer<? super Character> action) {
            action.accept(string.charAt(currentChar++));
            return currentChar < string.length();
        }

        /**
         * @return
         * 核心方法,首先设定拆分下限,找到空格创建新的Spliterator
         */
        @Override
        public Spliterator<Character> trySplit() {
            int currentSize = string.length() - currentChar;
            if (currentSize < 10) {
                return null;
            }
            for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) {
                if (Character.isWhitespace(string.charAt(splitPos))) {
                    Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos));
                    currentChar = splitPos;
                    return spliterator;
                }
            }
            return null;
        }

        @Override
        public long estimateSize() {
            return string.length() - currentChar;
        }

        @Override
        public int characteristics() {
            return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
        }
    }
}

发表评论

电子邮件地址不会被公开。