Java8| Stream
by Botao Xiao
Stream
从支持数据处理操作的源生成的元素序列。
- 元素序列:流提供了一个接口,可以访问特定元素类型的一组有序值。集合的目的在于以特定的时间/空间复杂度存储和访问元素,流的目的是为了计算。
- 源:流会使用一个提供数据的源,如集合,数组或输入输出资源。从有序集合生成流时会保存原有的顺序。 数据处理操作:filter,map, reduce, find, match, sort等操作,都是线程安全的。
- 流水线:很多流操作都会返回一个流,这样多个操作就能连接起来形成流水线。
- 内部迭代:流的迭代操作是隐式的,并不像集合需要显式的迭代器。
Stream的操作
- filter(), 流水线中的过滤操作。接收一个Predicate对象进行filter
/** *Returns a stream consisting of the elements of this stream that match *the given predicate. */ Stream<T> filter(Predicate<? super T> predicate);
- map(), 接收一个Lambda,将元素转换成其他形式或者提取信息。
/** *Returns a stream consisting of the results of applying the given *function to the elements of this stream. */ <R> Stream<R> map(Function<? super T, ? extends R> mapper);
- limit(), 截断流,使其元素不超过给定数量。
/** *Returns a stream consisting of the elements of this stream, truncated *to be no longer than {@code maxSize} in length. */ Stream<T> limit(long maxSize);
- collect(), 将流转换为其他的形式,例如collect(toList())将流转换成一个链表。 ```Java
<R, A> R collect(Collector<? super T, A, R> collector);
### 使用流
流的使用分成三件事:
1. 一个数据源(如集合):执行一个查询。
2. 中间操作链:形成一条流水线。
3. 一个终端操作:执行流水线,生成结果。
#### 使用谓词筛选 filter()方法
```Java
/**
*Returns a stream consisting of the elements of this stream that match
*the given predicate.
*@return the new stream
*/
Stream<T> filter(Predicate<? super T> predicate);
Example:
menu.parallelStream()
.filter(Dish::isVegetarian)
.forEach(System.out::println);
筛选各异的元素 distinct()
/**
*Returns a stream consisting of the distinct elements (according to
*{@link Object#equals(Object)}) of this stream.
*@return the new stream
*/
Stream<T> distinct();
Example:
list.parallelStream()
.distinct()
.forEach(System.out::println);
截断流 limit(),只会返回固定数值个元素。
/**
*Returns a stream consisting of the elements of this stream, truncated
*to be no longer than {@code maxSize} in length.
*/
Stream<T> limit(long maxSize);
menu.parallelStream()
.filter((d) -> d.getCalories() > 300)
.limit(3)
.forEach(System.out::println);
跳过元素, skip(n), 会跳过当前流水线中前n个元素。一般和limit结合起来使用。
menu.parallelStream()
.filter((d) -> d.getCalories() > 300)
.skip(2)
.forEach(System.out::println);
对流中的每一个元素应用函数map()
/**
*Returns a stream consisting of the results of applying the given
*function to the elements of this stream.
*@return the new stream
*/
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
流的扁平化flatMap,实际上就是将数组中的每一个对象生成一个流,即将流扁平化。
/**
*Returns a stream consisting of the results of replacing each element of
*this stream with the contents of a mapped stream produced by applying
*the provided mapping function to each element. Each mapped stream is
*{@link java.util.stream.BaseStream#close() closed} after its contents
*have been placed into this stream. (If a mapped stream is {@code null}
*an empty stream is used, instead.)
*/
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
Example:
Arrays.stream(arr) // 将字符串数组转化成流Stream<String>
.map(w -> w.split("")) // 将每个字符串分解成字母,每一个字符串都分解成数组Stream<String[]>
.flatMap(Arrays::stream) // 将流扁平化,每一个字符作为字符串都形成一条新的流Stream<String>
.distinct()
.forEach(System.out::println);
检查谓词是否匹配一个元素anyMatch(), 终端操作。
/**
*Returns whether any elements of this stream match the provided
*predicate. May not evaluate the predicate on all elements if not
*necessary for determining the result. If the stream is empty then
*{@code false} is returned and the predicate is not evaluated.
*/
boolean anyMatch(Predicate<? super T> predicate);
Exmaple:
if(menu.stream().anyMatch(Dish::isVegetarian))
System.out.println("Contains vegetarian food.");
检查谓词是否满足所有的元素allMatch(), 终端操作。
/**
*Returns whether all elements of this stream match the provided predicate.
*May not evaluate the predicate on all elements if not necessary for
*determining the result. If the stream is empty then {@code true} is
*returned and the predicate is not evaluated.
*/
boolean allMatch(Predicate<? super T> predicate);
Example:
System.out.println(menu.stream().allMatch(Dish::isVegetarian));
完全不满足noneMatch(), 终端操作。
/**
*Returns whether no elements of this stream match the provided predicate.
*May not evaluate the predicate on all elements if not necessary for
*determining the result. If the stream is empty then {@code true} is
*returned and the predicate is not evaluated.
*/
boolean noneMatch(Predicate<? super T> predicate);
Example:
System.out.println(menu.stream().noneMatch(Dish::isVegetarian));
查找复合要求的元素
- findAny(), 返回一个找到的符合要求的元素。不一定是空间上的第一个。如下的返回值是6
List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9); list.parallelStream() .filter(x -> x % 3 == 0) .findAny() .ifPresent(System.out::println);
- findFirst(),返回第一个符合要求的元素,必定是空间上的第一个。
List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9); list.parallelStream() .filter(x -> x % 3 == 0) .findFirst() .ifPresent(System.out::println);
规约 reduce
- 规约将流中的所有的元素反复结合起来最后成为一个值。 reduce(int default, BiFunction bf)
int result = Arrays.stream(arr1) .reduce(0, (a, b) -> a + b); System.out.println(result);
- reduce(BiFunction bf): 因为没有了默认值作为基础,所以有可能得到null,此时我们将返回值设置成OptionalInt避免空指针问题。
int[] arr1 = {1,2,3,4,5,6,7,8}; OptionalInt result = Arrays.stream(arr1) .reduce((a, b) -> a + b); if(result.isPresent()) System.out.println(result.getAsInt());
创建流
- 通过值创建流
Stream<Integer> stream = Stream.of(1, 2, 3, 4, 5, 6, 7, 8);
- 通过数组创建流
int[] arr = {1,2,3,4,5,6,7,8,9}; System.out.println(Arrays.stream(arr).sum());
- 由文件生成流
try (Stream<String> lines = Files.lines(Paths.get("E:\\LocalProject\\Concurrent\\src\\main\\java\\ca\\mcmaster\\concurrent\\pojo\\MyReader.java"), Charset.defaultCharset())){ lines.map(String::length) .forEach(System.out::println); }
- 创建无限流,主要有两个静态方法iterate和generate。一般和limit结合起来使用。
Stream.iterate(0, n -> n + 2) .limit(20) .forEach(System.out::println); Stream.generate(Math::random) .limit(100) .forEach(System.out::println);
使用流收集数据
规约和汇总
- 获得流中数据的个数
Long number = menu.parallelStream().collect(Collectors.counting()); System.out.println(number); Long number1 = menu.parallelStream().count(); System.out.println(number1);
- 查找流中的最大值和最小值
// Max Optional<Dish> max = menu.parallelStream().max((d1, d2) -> d1.getCalories() - d2.getCalories()); Optional<Dish> max1 = menu.parallelStream().collect(Collectors.maxBy((d1, d2) -> d1.getCalories() - d2.getCalories())); if(max.isPresent()) System.out.println(max.get()); if(max1.isPresent()) System.out.println(max1.get()); // Min Optional<Dish> max = menu.parallelStream().min((d1, d2) -> d1.getCalories() - d2.getCalories()); Optional<Dish> max1 = menu.parallelStream().collect(Collectors.minBy((d1, d2) -> d1.getCalories() - d2.getCalories())); if(max.isPresent()) System.out.println(max.get()); if(max1.isPresent()) System.out.println(max1.get());
- 汇总,将流中的某个域进行汇总
// 计算总和 Integer collect = menu.parallelStream().collect(Collectors.summingInt(Dish::getCalories)); System.out.println(collect); // 计算平均值 Double collect = menu.parallelStream().collect(Collectors.averagingInt(Dish::getCalories)); System.out.println(collect);
- 连接字符串, 首先要确保当前的流已经被映射到了一个字符串流上。
String collect = menu.parallelStream().map(Dish::getName).collect(Collectors.joining()); System.out.println(collect);
广义的规约和汇总, 使用collect和reducing
/**
*Returns a {@code Collector} which performs a reduction of its
*input elements under a specified mapping function and
*{@code BinaryOperator}. This is a generalization of
*{@link #reducing(Object, BinaryOperator)} which allows a transformation
*of the elements before reduction.
*@param identity the identity value for the reduction (also, the value that is returned when there are no input elements)
*@param mapper a mapping function to apply to each input value
*@param op a {@code BinaryOperator<U>} used to reduce the mapped values
public static <T, U> Collector<T, ?, U> reducing(U identity,
Function<? super T, ? extends U> mapper,
BinaryOperator<U> op) {
return new CollectorImpl<>(
boxSupplier(identity),
(a, t) -> { a[0] = op.apply(a[0], mapper.apply(t)); },
(a, b) -> { a[0] = op.apply(a[0], b[0]); return a; },
a -> a[0], CH_NOID);
}
Example:
Integer sumCalories = menu.parallelStream().collect(Collectors.reducing(0, Dish::getCalories, (n1, n2) -> n1 + n2));
System.out.println(sumCalories);
- 上一个chapter中所有提及的规约方式均可以通过reducing实现,我们可以通过Function<U, R>来实现mapping, 而我们使用BiFunction来实现我们的规约操作。
- 实现的方法更多了,但是可读性变差了。
分组
分组的返回值是一个Map,对于每一个项目,键是我们定义的classifier,而值是符合这个键的对象的列表。
Map<Type, List<Dish>> map = menu.stream().collect(Collectors.groupingBy(Dish::getType));
System.out.println(map.toString());
- 我们自己手动扩充分组函数
Map<caloriesType, List<Dish>> collect = menu.stream().collect(Collectors.groupingBy(dish -> { if (dish.getCalories() >= 700) return caloriesType.HIGH; else if (dish.getCalories() <= 400) return caloriesType.LOW; else return caloriesType.MEDIUM; })); System.out.println(collect.toString());
- 多级分组
Map<Type, Map<caloriesType, List<Dish>>> collect = menu.stream().collect(Collectors.groupingBy(Dish::getType, Collectors.groupingBy(dish -> { if (dish.getCalories() >= 700) return caloriesType.HIGH; else if (dish.getCalories() <= 400) return caloriesType.LOW; else return caloriesType.MEDIUM; }))); System.out.println(collect.toString());
- 将收集器的结果转换成另一种类型
Map<Type, Dish> collect = menu.stream().collect(Collectors.groupingBy(Dish::getType, Collectors.collectingAndThen(Collectors.maxBy(Comparator.comparing(Dish::getCalories)), Optional::get))); System.out.println(collect);
分区
分区的好处在于保留了分区函数返回true和false的两套流元素。
Map<Boolean, Map<Type, List<Dish>>> collect = menu.stream().
collect(Collectors.partitioningBy(Dish::isVegetarian,
Collectors.groupingBy(Dish::getType)));
Collector接口分析
public interface Collector<T, A, R> {
/**
*A function that creates and returns a new mutable result container.
*@return a function which returns a new, mutable result container
*/
Supplier<A> supplier();
/**
*A function that folds a value into a mutable result container.
*@return a function which folds a value into a mutable result container
*/
BiConsumer<A, T> accumulator();
/**
*A function that accepts two partial results and merges them. The
*combiner function may fold state from one argument into the other and
*return that, or may return a new result container.
*@return a function which combines two partial results into a combined
*result
*/
BinaryOperator<A> combiner();
/**
*Perform the final transformation from the intermediate accumulation type
*{@code A} to the final result type {@code R}.
*<p>If the characteristic {@code IDENTITY_TRANSFORM} is
*set, this function may be presumed to be an identity transform with an
*unchecked cast from {@code A} to {@code R}.
*@return a function which transforms the intermediate result to the final
*result
*/
Function<A, R> finisher();
/**
*Returns a {@code Set} of {@code Collector.Characteristics} indicating
*the characteristics of this Collector. This set should be immutable.
*@return an immutable set of collector characteristics
*/
Set<Characteristics> characteristics();
}
- supplier: 返回一个空的容器。
- accumulator:将一个元素添加到容器之中。
- combiner:将两个容器合并。
- finisher: 对结果容器应用的最终转换。
- characteristics:对流进行定义。
实现自己的ToListCollector
public class ToListCollector <T> implements Collector<T, List<T>, List<T>> {
@Override
public Supplier<List<T>> supplier() {
return LinkedList::new;
}
@Override
public BiConsumer<List<T>, T> accumulator() {
return (list, n) -> list.add(n);
}
@Override
public BinaryOperator<List<T>> combiner() {
return (l1, l2) -> {l1.addAll(l2)};
}
@Override
public Function<List<T>, List<T>> finisher() {
return Function.identity();
}
@Override
public Set<Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT));
}
}
Subscribe via RSS