Distributed Systems

Books

Introduction

A distributed system is a system composed of several computers that are connected in some way and that together provide some service or achieve some goal, e.g. The hardware of a distributed system may for instance be a VLSI chip, a shared-memory multiprocessor machine, a LAN of workstations or PCs, or a global network.

In this course we study some ideas, concepts and techniques that are useful in understading, designing and implementing distributed systems.

1  A Mathematical Model

We begin by describing a mathematical model that can be used to express many distributed systems.




Consider a finite connected graph (P, L). We call the nodes processes and the edges links. A link makes possible communication of data from a set D between the processes that it joins.

Example: Suppose the graph has 12 processes P0,... ,P11 connected in a ring, i.e. P = {P0,... ,P11} and L = {(Pi, Pi+1) | 0 £ i < 12} where arithmetic is mod 12. Let D = N È {done}.


[Diagram goes here]


The set M of messages is P × D × P.

Example: The message (P3, done, P4) is the datum done set from P3 to P4.




The state of a process at a given moment is determined by the control state it is in, the data it is storing, and the arrived messages that have been delivered to it but not examined by it.

Example: Suppose each Pi has an associated name ni Î N. Let the set S of states be Q × N × F(M) where Q = {init, active, leader, follower} and F(M) is the set of all finite multisets1 of messages. A state of P2 might be (active, 20, {(P1, 0, P2), (P1, 10, P2)}).




A configuration of a distributed system describes the state of each of its processes and the collection of messages that have been sent but not delivered.

Example: The set C of configurations is S12 × F(M).




Each process has a transition function that determines what, if anything, it may do in a given state. Transition functions are sometimes described explicitly, as in the next example, and sometimes implicitly, by some text, pseudo-code, or program.




Example: The transition function of Pi, for 0 £ i < 12, is di : S S × F(M) defined by:
di(init, n, in) = ((active, n, in), {(Pi, n, Pi+1)})
di(q, n, in) = ((q, n, Ø), Ø)
if q=leader or q=follower or (q=active and in=Ø)
di(active, n, in) =
if done Î data(in) then ((follower, n, Ø), {(Pi, done, Pi+1)})
else if n Î data(in) then ((leader, n, Ø),{(Pi, done, Pi+1)})
else if n > max(in) then ((active, n, Ø), Ø)
else if n < max(in) then ((active, n, Ø, {(Pi, max(in), Pi+1)})
where data(in) is the set of data in the message set in,
and max(in) is the largest integer in that set (and in ¹ Ø)

Very rough sketch of di:
                        /~\
                        | | 
                        | v 
              init -> active -> leader <-\
                         |            \__/
                         v
                      follower <-\            [mmmm, ASCII]
                              \__/
There are two kinds of event: If an event E is possible in a configuration C, it may occur and result in a configuration C': C ¾®e C'.

Example

  1. Suppose C = (S0,...,S11,U) and di(Si) = (Si', out). Then C ¾®comp(i) C' = (S0, ..., Si', ..., S11, U È out).

  2. Suppose C = (S0, ..., S11, U È {m}) where m = (Pj, d, Pi). Then C ¾®del(m) C' = (S0, ..., Si, ..., S11, U) where if Si = (q, n, in) then Si' = (q, n, in È {m}).




Q has a nonempty subset Qinit of initial control states, and a subset Qfinal of final control states. A state is initial (final) if its control state is. A configuration is initial (final) if the state of each of its processes is.

Example: Qinit = {init} and Qfinal = {leader, follower}.

Also, Sinit = Qinit × N × {Ø} and Sfinal = Qfinal × N × F(M).

Also, Cinit = (Sinit)12 × F(M) and Cfinal = (Sfinal)12 × F(M).





In general, if d is a transition function, S is a final state, and d(S) = (S', out), then S' is final. Hence, if C is final and C ¾®e C', then C' is final.

An evolution of a distributed system described by a configuration C0 is of the form C0 ¾®e0 C1 ¾®e1 C2 ¾®e2 .... An evolution C0 ¾®e0 C1 ¾®e1 ... is terminating is there exists j such that Cj is final.

If C0 describes an asynchronous system then an evolution C0 ¾®e0 C1 ¾®e1 ... is admissible if:
  1. For each process Pi there are infinitely many j such that ej = comp(i).
  2. If ej = comp(i) and m is created in ej, then ek = del(m) for some k.
In an admissible evolution of an asynchronous system, each process performs infinitely many transitions, and every message that is created is delivered. There need be no finite upper bound on the number of events between two transitions of a process or between the creation and the delivery of a message.





If C0 describes a synchronous system then an evolution C0 ¾®e0 C1 ¾®e1 ... is admissible if it consists of a sequence of rounds. In a round,
  1. all outstanding messages are delivered, and then
  2. each process performs a computation step which results in at most one message being created, addressed to any given process.
There is a fixed bound on the number of events between the creation and delivery of a message.

2  Leader Election

The leader election problem is to design a process that stores just two data -- its name n:N and its status s:{null, leader, follower} -- so that for any k>1, if S is a system consisting of k copies of P -- that may differ only in storing different names -- connected in a ring, and in which there are initially no undelivered messages, then every admissible evolution of S contains a final configuration in which the status of one P is leader, and the status of every other P is follower.

A. The synchronous case

Fix k>1. Choose distinct n0, ..., nk-1. P has variables s, initially null, and n, initially ni in Pi. P's transitions are described as follows, where c refers to the clockwise neighbour:
init = c!n active
leader = ?in leader
follower = ?in follower
active = ?in IF (
    done Î data(in): s:=follower c?done follower
    n Î data(in) : s:= leader c!done leader
    n < max(in) : c!max(in) active
    n > max(in) : active)

[Notes: (1) The IF construct means ``if...then...elsif...then...elsif...then...'' and so on. (2) An empty set doesn't actually have a maximum, to the IF wouldn't appear to cover the case where in=Ø. It works (I think) if you pretend that max(Ø) = -1, or that the last branch will always be executed if in=Ø.]

Let n* = maxn0, ..., nk-1, and suppose n* = nm.



Lemma 1

Suppose 0 £ r < k.
  1. In round r,
    1. Pm+r sends n*
    2. If j ¹ m+r then Pj does not send n*
    3. No Pj sends done
    4. If j ¹ m then no Pi with i Î [m... j-1] sends j
    1. Pj is in control state active.
    2. sj = null

Lemma 2

Suppose 0 £ r < k.
  1. In round k+r, Pm+r sends done.
  2. At the end of round k+r,
    1. Pm is in control state leader and sm = leader
    2. If j Î (m,m+r] then Pj is in control state follower and sj = follower
    3. If j Î (m+r, m) then Pj is in control state active and sj = null

Corollary 3

At the end of round 2k-1,
  1. sm = leader
  2. If j ¹ m then sj = follower

Message complexity

The number of messages created in the first 2k rounds is O(k2).

B. The asynchronous case

Consider the system S, but now viewed as being asynchronous.

Lemma 4

Suppose C is a configuration in an admissible evolution of S.
  1. Pj is leader iff sj = leader.
  2. Pj is follower iff sj = follower.
  3. Pj is init or active iff sj = null.
  4. If C contains (Pi, n*, Pi+1) then Pm is active and sj = null for j ¹ m.
  5. C contains at most one message of the form (Pi, n*, Pi+1).
  6. If j ¹ m and i Î [m ... j-1], then C does not contain (Pi, nj, Pi+1).
  7. If j ¹ m then sj ¹ leader.
  8. If C contains (Pm+j, done, Pm+j+1), where d £ j < k, then sm = leader and si = follower for i Î (m, m+j] and Pi is active for i Î (m+j, m).
  9. C contains at most one message of the form (Pi, done, Pi+1).

Lemma 5

Suppose C0 ¾®e0 C1 ¾®e1 ... is an admissible evolution of S.
  1. If 0 £ j<k then some Ch contains an undelivered message (Pm+j, n*, Pm+j+1).
  2. If o £ j<k then in some Ch, sm = leader and si = follower for i Î (m, m+j] and there is an undelivered message (Pm+j, done, Pm+j+1)

Corollary 6

Every admissible evolution of S contains a configuration in which sm = leader and sj = follower for j ¹ m.




Further reading on Leader Election: Lynch ch. 3 and 15, Attiya & Welch ch. 3

3  Logical clocks and vector clocks

Suppose C0 ¾®e0 C1 ¾®e1 ... is an evolution of an asynchronous system S consisting of n processes P1, ..., Pn. Assume that ei ¹ ej if i ¹ j and that no two messages created in the evolution are identical. Let e0i, e1i, ... be the events performed by Pi (in order of occurrence in the evolution).

Definition

The happens before relation is the smallest transitive relation such that:
  1. eji ej+1i for all i, j, and
  2. if m is created in e and consumed in e' then e e'.
References Lynch 18, A&W 6, Coulouris 10, Mullender 4

We derive an evolution of a system S'. Each Pi' has a logical clock, and integer variable ci, initially 0. If Pi performs ej then Pi' performs ej' by carrying out the non-sending actions of Ej, then executing: ci := 1 + max{ci, max{t | (Pj', (d,t), Pi') is consumed in ej'}}, and then carrying out the sending actions of ej, but sending (d, ci instead of d.

l(ej) is defined to be the value of ci when execution of ej' finishes.

Lemma 7

If e f is in the evolution of S then l(e) < l(f). The converse does not hold, however.

Example (Send-Consume Diagram)

(7147,2085)(76,-3961) (1201,-2161)150 (1801,-3661)150 (2101,-2161)150 (2401,-3661)150 (2701,-3661)150 (3301,-2161)150 (3901,-2161)150 (4501,-2161)150 (6301,-3661)150 (5326,-3661)150 (5776,-2161)150 (601,-2161)( 1, 0)6600 (601,-3661)( 1, 0)6600 (2401,-3661)( 1, 1)1425 (1276,-2236)( 1,-3)450 (2776,-3586)( 1, 3)450 (3976,-2236)( 1,-1)1275 (4426,-2011)(0,0)[lb]1214.4pt7 (5701,-2011)(0,0)[lb]1214.4pt8 (1726,-3961)(0,0)[lb]1214.4pt2 (2326,-3961)(0,0)[lb]1214.4pt3 (2626,-3961)(0,0)[lb]1214.4pt4 (1126,-2011)(0,0)[lb]1214.4pt1 (2026,-2011)(0,0)[lb]1214.4pt2 (3226,-2011)(0,0)[lb]1214.4pt5 (3826,-2011)(0,0)[lb]1214.4pt6 (5251,-3961)(0,0)[lb]1214.4pt7 (6301,-3961)(0,0)[lb]1214.4pt8 (2101,-2461)(0,0)[lb]1214.4pte (2251,-3511)(0,0)[lb]1214.4ptf (296,-2236)(0,0)[lb]1214.4ptP1 (296,-3736)(0,0)[lb]1214.4ptP2

We derive an evolution of a system S''. Each Pi'' has a vector clock, an array variable vi[1..n], initially vi[j]=0 for 1 £ j £ n. If P performs eh then Pi'' performs eh'' by carrying out the non-sending actions of eh, then executing
vi[i] := vi[i] + 1   and for   j ¹ i,
vi[j] := max{vi[j], max{tj | (Pk'',(d,á t1..tnñ),Pi'')  is consumed in  eh''}}

and then carrying out the sending actions of eh, but sending (d,vi), instead of d. v(eh) is defined to be the value of vi when execution of eh'' finishes.

Note: In any configuration of the evolution of S'', vj[i] £ vi[i].

Define v £ w if v[i] £ w[i]  " i, and v<w if v £ w and v ¹ w. Also, v and w are incomparable if v ¬ £w and w ¬ £v.

Lemma 8

e f is the evolution of S iff v(e)<v(f).

Proof (sketch)

Corollary 9

e and f are concurrent, i.e. e ¬ f and f ¬ e, iff v(e) and v(f) are incomparable.




Suppose the links between the processes of S are FIFO queues. For convenience, assume that when an event involves the creation of a message m from Pi to Pj, it also involves the delivery of m to the FIFO queue qij, from which Pj can consume messages.

A cut is a (finite) subset of {e0,e1,...} that is the union of the events in finite initial segments e0i,,...,es(i)i for 1 £ i £ n. A cut determines a configuration: each Pi is in the state after es(i)i has occurred. A cut K is consistent if e Î K and e' e implies e' Î K.



Example

(8422,2562)(601,-3811) (1501,-1561)150 (2251,-3061)150 (2851,-1561)150 (3751,-3061)150 (4201,-3061)150 (6901,-3061)150 (7651,-1561)150 (8251,-3061)150 (4801,-1561)150 (5926,-1561)150 (5326,-1561)150 (901,-1561)( 1, 0)8100 (901,-3061)( 1, 0)8100 (1501,-1636)( 1,-2)675 (3751,-3061)( 3, 2)2025 (4276,-2986)( 1, 3)450 (5476,-1636)( 1,-1)1350 (4051,-3361)( 0, 1)825 (4051,-2536)( 3, 2)900 (4951,-1936)( 1, 4)150 (5551,-3361)( 0, 1)2100 (3976,-3586)(0,0)[lb]1012.0ptcut 1 (3976,-3811)(0,0)[lb]1012.0pt(inconsistent) (5551,-3661)(0,0)[lb]1012.0ptcut 2 (consistent) (601,-1636)(0,0)[lb]1012.0ptP_1 (601,-3136)(0,0)[lb]1012.0ptP_2

A configuration is consistent if it is the configuration determined by a consistent cut.

The Chandy, Lamport distributed snapshot algorithm can be used to construct a consistent configuration in evolution of an asynchronous system.2

For concreteness, suppose P1,...,Pn are bank processes that send one another amounts of money from accounts they maintain. Assume that if Pi and Pj are adjacent, then Pi sends infinitely many messages to Pj, and assume that each Pi consumes from each qki infinitely often.

Introduce a monitor process P0, which is directly connected to each Pi. P0 acts as follows (repeatedly): The behaviour of Pi (1 £ i £ n) is augmented as follows: Global snapshots can be used to detect if a stable property holds, e.g. if there is a deadlock in the system. (Suggested reading: Mullender ch. 4, Lynch ch. 19).

4  Commit Protocols

Consider a synchronous distributed database, whose component processes can fail by stopping. To maintain consistency, no transaction can be committed by one process but aborted by another. More precisely, we consider first the two-phase commit protocol, which ensures that if no process fails then a configuration is reached in which every process fails then a configuration is reached in which every process has committed or every process has aborted. One process is deemed the coordinator.

In phase 1, the coordinator sets its status to the appropriate one of can commit and must abort, and each non-coordinator does likewise and sends its status to the coordinator. If the coordinator's status is can commit and it receives notification from each non-coordinator that its status is can commit, then the decision is commit; otherwise the decision is abort. In phase 2, the coordinator sens the decision to each non-coordinator, which acts on it.

Lemma 10

If in an admissible evolution of the two-phase commit protocol no process fails, then a configuration is reached in which every process has committed or every process has aborted.




The three-phase commit protocol modifies and extends the two-phase commit protocol to take account of the possibility that th coordinator may fail. It achieves the stronger property that a configuration is reached in which every process that has not failed has committed or aborted, and in which it is not the case that some process has committed and some [other] process has aborted.

Suggested reading: Lynch, section 7.


1
I think a multiset (a.k.a. a bag) is like a normal set, but you can have multiple copies of the same thing in it.
2
Hooray.

This document was translated from LATEX by HEVEA.