import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class Test {
static List<String> urls = new ArrayList<String>() {
{
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
}
};
// 本质是一个线程池,默认的线程数量:CPU的核数
static ForkJoinPool forkJoinPool = new ForkJoinPool(3,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
true);
public static String doRequest(String url,int index) {
return index+" test " + url + "\n";
}
static class Job extends RecursiveTask<String> {
List<String> urls;
int start;
int end;
public Job(List<String> urls, int start, int end) {
this.urls = urls;
this.start = start;
this.end = end;
}
@Override
protected String compute() {
int count = end - start;
if (count <= 10) {
String result = "";
for (int i = start; i < end; i++) {
String s = doRequest(urls.get(i),i);
result = result + s;
}
return result;
} else {
int mid = (start + end) / 2;
Job job1 = new Job(urls, start, mid);
job1.fork();
Job job2 = new Job(urls, mid, end);
job2.fork();
//固定写法
String results = "";
results += job1.join();
results += job2.join();
return results;
}
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
Job job = new Job(urls, 0, urls.size());
ForkJoinTask<String> forkJoinTask = forkJoinPool.submit(job);
String s = forkJoinTask.get();
System.out.println(s);
}
}
采用的就是分而治之的思想。特点是:工作窃取,当一个线程的任务执行完了之后,如果其他线程还有没执行完毕的任务,会去窃取过来执行。维护的是一个双端队列。窃取的时候从队列的底部取任务
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class NumberSum extends RecursiveTask<Long> {
private Long start;
private Long end;
private Long temp = 10000L;
public NumberSum(Long start, Long end) {
this.start = start;
this.end = end;
}
//计算方法
@Override
protected Long compute() {
if ((end - start) < temp) {
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
Long result = 0L;
Long mid = (start + end) / 2;
NumberSum n1 = new NumberSum(start, mid);
n1.fork();
NumberSum n2 = new NumberSum(mid + 1, end);
n2.fork();
result += n1.join();
result += n2.join();
return result;
}
}
static ForkJoinPool forkJoinPool = new ForkJoinPool(3,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
true);
public static void main(String[] args) throws ExecutionException, InterruptedException {
NumberSum numberSum = new NumberSum(0L, 1_0000_0000L);
ForkJoinTask<Long> future = forkJoinPool.submit(numberSum);
System.out.println(future.get());
}
}