• Nenhum resultado encontrado

Futures, Scheduling, and Work Distribution

N/A
N/A
Protected

Academic year: 2022

Share "Futures, Scheduling, and Work Distribution"

Copied!
68
0
0

Texto

(1)

Futures, Scheduling, and Work Distribution

Companion slides for

The Art of Multiprocessor

Programming

(2)

How to write Parallel Apps?

•   How to

–   split a program into parallel parts in an effective way

–   Thread management

(3)

Matrix Multiplication

(4)

Matrix Multiplication

c ij = ∑ k=0 N-1 a ki * b jk

(5)

Matrix Multiplication

class Worker extends Thread { int row, col;

Worker(int row, int col) {

this.row = row; this.col = col;

}

public void run() {

double dotProduct = 0.0;

for (int i = 0; i < n; i++)

dotProduct += a[row][i] * b[i][col];

c[row][col] = dotProduct;

(6)

Matrix Multiplication

class Worker extends Thread { int row, col;

Worker(int row, int col) {

this.row = row; this.col = col;

}

public void run() {

double dotProduct = 0.0;

for (int i = 0; i < n; i++)

dotProduct += a[row][i] * b[i][col];

c[row][col] = dotProduct;

a thread

(7)

Matrix Multiplication

class Worker extends Thread { int row, col;

Worker(int row, int col) {

this.row = row; this.col = col;

}

public void run() {

double dotProduct = 0.0;

for (int i = 0; i < n; i++)

dotProduct += a[row][i] * b[i][col];

c[row][col] = dotProduct;

Which matrix entry

to compute

(8)

Matrix Multiplication

class Worker extends Thread { int row, col;

Worker(int row, int col) {

this.row = row; this.col = col;

}

public void run() {

double dotProduct = 0.0;

for (int i = 0; i < n; i++)

dotProduct += a[row][i] * b[i][col];

c[row][col] = dotProduct;

Actual computation

(9)

Matrix Multiplication

void multiply() {

Worker[][] worker = new Worker[n][n];

for (int row …) for (int col …)

worker[row][col] = new Worker(row,col);

for (int row …) for (int col …)

worker[row][col].start();

for (int row …)

for (int col …)

(10)

Matrix Multiplication

void multiply() {

Worker[][] worker = new Worker[n][n];

for (int row …) for (int col …)

worker[row][col] = new Worker(row,col);

for (int row …) for (int col …)

worker[row][col].start();

for (int row …)

for (int col …) Create nxn

threads

(11)

Matrix Multiplication

void multiply() {

Worker[][] worker = new Worker[n][n];

for (int row …) for (int col …)

worker[row][col] = new Worker(row,col);

for (int row …) for (int col …)

worker[row][col].start();

for (int row …) for (int col …)

Start them

(12)

Matrix Multiplication

void multiply() {

Worker[][] worker = new Worker[n][n];

for (int row …) for (int col …)

worker[row][col] = new Worker(row,col);

for (int row …) for (int col …)

worker[row][col].start();

for (int row …)

for (int col …) Wait for

Start them

(13)

Matrix Multiplication

void multiply() {

Worker[][] worker = new Worker[n][n];

for (int row …) for (int col …)

worker[row][col] = new Worker(row,col);

for (int row …) for (int col …)

worker[row][col].start();

for (int row …)

for (int col …) Wait for

What’s wrong with this picture?

Start them

(14)

Thread Overhead

•   Threads Require resources

–   Memory for stacks –   Setup, teardown

•   Scheduler overhead

•   Worse for short-lived threads

(15)

Thread Pools

•   More sensible to keep a pool of long -lived threads

•   Threads assigned short-lived tasks

–   Runs the task –   Rejoins pool

–   Waits for next assignment

(16)

Thread Pool = Abstraction

•   Insulate programmer from platform

–   Big machine, big pool –   And vice-versa

•   Portable code

–   Runs well on any platform

–   No need to mix algorithm/platform

concerns

(17)

ExecutorService Interface

•   In java.util.concurrent

–   Task = Runnable object

•   If no result value expected

•   Calls run() method.

–   Task = Callable<T> object

•  If result value of type T expected

•   Calls T call() method.

(18)

Future<T>

Callable<T> task = …;

Future<T> future = executor.submit(task);

T value = future.get();

(19)

Future<T>

Callable<T> task = …;

Future<T> future = executor.submit(task);

T value = future.get();

Submitting a Callable<T> task

returns a Future<T> object

(20)

Future<T>

Callable<T> task = …;

Future<T> future = executor.submit(task);

T value = future.get();

The Future’s get() method blocks

until the value is available

(21)

Future<?>

Runnable task = …;

Future<?> future = executor.submit(task);

future.get();

(22)

Future<?>

Runnable task = …;

Future<?> future = executor.submit(task);

future.get();

Submitting a Runnable task

returns a Future<?> object

(23)

Future<?>

Runnable task = …;

Future<?> future = executor.submit(task);

future.get();

The Future’s get() method blocks

until the computation is complete

(24)

Note

•   Executor Service submissions

–   Like New England traffic signs –   Are purely advisory in nature

•   The executor

–   Like the New England driver

–   Is free to ignore any such advice

– And could execute tasks sequentially …

(25)

Matrix Addition

(26)

Matrix Addition

4 parallel additions

(27)

Matrix Addition Task

class AddTask implements Runnable { Matrix a, b; // multiply this!

public void run() { if (a.dim == 1) {

c[0][0] = a[0][0] + b[0][0]; // base case } else {

(partition a, b into half-size matrices a

ij

and b

ij

) Future<?> f

00

= exec.submit(add(a

00

,b

00

));

Future<?> f

11

= exec.submit(add(a

11

,b

11

));

f

00

.get(); …; f

11

.get();

(28)

Matrix Addition Task

class AddTask implements Runnable { Matrix a, b; // multiply this!

public void run() { if (a.dim == 1) {

c[0][0] = a[0][0] + b[0][0]; // base case } else {

(partition a, b into half-size matrices a

ij

and b

ij

) Future<?> f

00

= exec.submit(add(a

00

,b

00

));

Future<?> f

11

= exec.submit(add(a

11

,b

11

));

f

00

.get(); …; f

11

.get();

Base case: add directly

(29)

Matrix Addition Task

class AddTask implements Runnable { Matrix a, b; // multiply this!

public void run() { if (a.dim == 1) {

c[0][0] = a[0][0] + b[0][0]; // base case } else {

(partition a, b into half-size matrices a

ij

and b

ij

) Future<?> f

00

= exec.submit(add(a

00

,b

00

));

Future<?> f

11

= exec.submit(add(a

11

,b

11

));

f

00

.get(); …; f

11

.get();

Constant-time operation

(30)

Matrix Addition Task

class AddTask implements Runnable { Matrix a, b; // multiply this!

public void run() { if (a.dim == 1) {

c[0][0] = a[0][0] + b[0][0]; // base case } else {

(partition a, b into half-size matrices a

ij

and b

ij

) Future<?> f

00

= exec.submit(add(a

00

,b

00

));

Future<?> f

11

= exec.submit(add(a

11

,b

11

));

f

00

.get(); …; f

11

.get();

Submit 4 tasks

(31)

Matrix Addition Task

class AddTask implements Runnable { Matrix a, b; // multiply this!

public void run() { if (a.dim == 1) {

c[0][0] = a[0][0] + b[0][0]; // base case } else {

(partition a, b into half-size matrices a

ij

and b

ij

) Future<?> f

00

= exec.submit(add(a

00

,b

00

));

Future<?> f

11

= exec.submit(add(a

11

,b

11

));

f

00

.get(); …; f

11

.get();

Let them finish

(32)

Dependencies

•   Matrix example is not typical

•   Tasks are independent

–   Don’t need results of one task … –   To complete another

•   Often tasks are not independent

(33)

Fibonacci

1 if n = 0 or 1 F(n)

F(n-1) + F(n-2) otherwise

•   Note

–   potential parallelism

–   Dependencies

(34)

Disclaimer

•   This Fibonacci implementation is

–   Egregiously inefficient

•   So don’t deploy it!

–   But illustrates our point

•  How to deal with dependencies

•   Exercise:

–   Make this implementation efficient!

(35)

Multithreaded Fibonacci

class FibTask implements Callable<Integer> { static ExecutorService exec =

Executors.newCachedThreadPool();

int arg;

public FibTask(int n) { arg = n;

}

public Integer call() { if (arg > 2) {

Future<Integer> left = exec.submit(new FibTask(arg-1));

Future<Integer> right = exec.submit(new FibTask(arg-2));

return left.get() + right.get();

(36)

Multithreaded Fibonacci

class FibTask implements Callable<Integer> { static ExecutorService exec =

Executors.newCachedThreadPool();

int arg;

public FibTask(int n) { arg = n;

}

public Integer call() { if (arg > 2) {

Future<Integer> left = exec.submit(new FibTask(arg-1));

Future<Integer> right = exec.submit(new FibTask(arg-2));

return left.get() + right.get();

Parallel calls

(37)

Multithreaded Fibonacci

class FibTask implements Callable<Integer> { static ExecutorService exec =

Executors.newCachedThreadPool();

int arg;

public FibTask(int n) { arg = n;

}

public Integer call() { if (arg > 2) {

Future<Integer> left = exec.submit(new FibTask(arg-1));

Future<Integer> right = exec.submit(new FibTask(arg-2));

return left.get() + right.get();

Pick up & combine results

(38)

Dynamic Behavior

•   Multithreaded program is

–   A directed acyclic graph (DAG) –   That unfolds dynamically

•   Each node is

–   A single unit of work

(39)

Fib DAG

fib(4)

fib(3) fib(2)

submit get

fib(2) fib(1) fib(1) fib(1)

(40)

Arrows Reflect Dependencies

fib(4)

fib(3) fib(2)

submit get

fib(2) fib(1) fib(1) fib(1)

(41)

How Parallel is That?

•   Define work:

–   Total time on one processor

•   Define critical-path length:

–   Longest dependency path

–   Can’t beat that!

(42)

Fib Work

fib(4)

fib(3) fib(2)

fib(2) fib(1) fib(1) fib(1)

(43)

Fib Work

1 2 3

8

4 5 6 7 9

14

10 11 12 13 15

(44)

Fib Critical Path

fib(4)

(45)

Fib Critical Path

fib(4)

1 8

2 7

3 4 6

(46)

threads and tasks

(47)

Work Distribution

zzz…

(48)

Work Dealing

Yes!

(49)

The Problem with Work Dealing

D’oh!

D’oh!

D’oh!

(50)

Work Stealing No

work…

Yes! zzz

(51)

Work Stealing

•   Each thread has a pool of ready work

•   Remove work

•   If you run out of work, steal someone else’s

•   Choose victim at random

(52)

Local Work Pools

Each work pool is a Double-Ended Queue

(53)

Work DEQueue 1

pushBottom

popBottom

work

(54)

Obtain Work

•   Obtain work

•   Run thread until

•   Blocks or terminates

popBottom

(55)

New Work

pushBottom

(56)

Whatcha Gonna do When the Well Runs Dry?

@&%$!!

empty

(57)

Steal Work from Others

Pick random guy’s DEQeueue

(58)

Steal this Thread!

popTop

(59)

Variations

•   What if

–   Randomly balance loads?

(60)

Work Balancing

5

2

b2+5 c/2=3

d2+5e/2=4

(61)

Work-Balancing Thread

public void run() {

int me = ThreadID.get();

while (true) {

Runnable task = queue[me].deq();

if (task != null) task.run();

int size = queue[me].size();

if (random.nextInt(size+1) == size) { int victim = random.nextInt(queue.length);

int min = …, max = …;

synchronized (queue[min]) { synchronized (queue[max]) {

balance(queue[min], queue[max]);

(62)

Work-Balancing Thread

public void run() {

int me = ThreadID.get();

while (true) {

Runnable task = queue[me].deq();

if (task != null) task.run();

int size = queue[me].size();

if (random.nextInt(size+1) == size) { int victim = random.nextInt(queue.length);

int min = …, max = …;

synchronized (queue[min]) { synchronized (queue[max]) {

balance(queue[min], queue[max]);

Keep running tasks

(63)

Work-Balancing Thread

public void run() {

int me = ThreadID.get();

while (true) {

Runnable task = queue[me].deq();

if (task != null) task.run();

int size = queue[me].size();

if (random.nextInt(size+1) == size) { int victim = random.nextInt(queue.length);

int min = …, max = …;

synchronized (queue[min]) { synchronized (queue[max]) {

balance(queue[min], queue[max]);

With probability

1/|queue|

(64)

Work-Balancing Thread

public void run() {

int me = ThreadID.get();

while (true) {

Runnable task = queue[me].deq();

if (task != null) task.run();

int size = queue[me].size();

if (random.nextInt(size+1) == size) { int victim = random.nextInt(queue.length);

int min = …, max = …;

synchronized (queue[min]) { synchronized (queue[max]) {

balance(queue[min], queue[max]);

Choose random victim

(65)

Work-Balancing Thread

public void run() {

int me = ThreadID.get();

while (true) {

Runnable task = queue[me].deq();

if (task != null) task.run();

int size = queue[me].size();

if (random.nextInt(size+1) == size) { int victim = random.nextInt(queue.length);

int min = …, max = …;

synchronized (queue[min]) { synchronized (queue[max]) {

balance(queue[min], queue[max]);

Lock queues in canonical order

(66)

Work-Balancing Thread

public void run() {

int me = ThreadID.get();

while (true) {

Runnable task = queue[me].deq();

if (task != null) task.run();

int size = queue[me].size();

if (random.nextInt(size+1) == size) { int victim = random.nextInt(queue.length);

int min = …, max = …;

synchronized (queue[min]) { synchronized (queue[max]) {

balance(queue[min], queue[max]);

Rebalance queues

(67)

Work Stealing & Balancing

•   Clean separation between app &

scheduling layer

•   Works well when number of processors fluctuates.

•   Works on “black-box” operating

systems

(68)

          

This work is licensed under a

Creative Commons Attribution-ShareAlike 2.5 License.

•  You are free:

–  to Share — to copy, distribute and transmit the work –  to Remix — to adapt the work

•  Under the following conditions:

–  Attribution. You must attribute the work to “The Art of

Multiprocessor Programming” (but not in any way that suggests that the authors endorse you or your use of the work).

–  Share Alike. If you alter, transform, or build upon this work, you may distribute the resulting work only under the same, similar or a compatible license.

•  For any reuse or distribution, you must make clear to others the license terms of this work. The best way to do this is with a link to

–  http://creativecommons.org/licenses/by-sa/3.0/.

•  Any of the above conditions can be waived if you get permission from the copyright holder.

•  Nothing in this license impairs or restricts the author's moral rights.

Referências

Documentos relacionados

 Nos últimos anos, o crescimento dos preços da habitação tem sido associado a uma queda forte do desemprego e ao crescimento do rendimento disponível, ao passo que a evolução

This is an open access article distributed under the terms of the Creative Commons Attribution License , which permits unrestricted use, distribution, and reproduction in any

This is an open-access article distributed under the terms of the Creative Commons Attribution License, which permits unrestricted use, distribution, and reproduction in any

A mais conhecida delas talvez seja aquela onde Kierkegaard está no centro do universo e todas as demais coisas gravitam ao seu redor, o periódico aponta que

do Prefeito, do Vice-Prefeito, dos Secretários Municipais, dos Diretores de Autarquias, Fundações e Empresas Públicas Municipais para a Legislatura seguinte, subsídios estes que

A principal restrição do Crest Factor é que quando usado para identificar falhas de rolamento, ele não aumenta de forma linear à medida que o rolamento degrada.. Na verdade, o

Pressione a tecla de função Manual para definir o ajuste de faixa do termovisor como manual; pressione a tecla Automático para ajuste automático.. Ajuste rápido do ajuste de

To protect the Project Gutenberg-tm mission of promoting the free distribution of electronic works, by using or distributing this work (or any other work associated in any way with