\documentclass[a4paper,11pt]{article}

\renewcommand{\b}[1]{\ensuremath{\mathbf{#1}}}
\newcommand{\event}[1]{\stackrel{#1}{\longrightarrow}}
\newcommand{\mi}[1]{\mathit{#1}}
\newtheorem{lemma}{Lemma}
\newtheorem{corollary}[lemma]{Corollary}

\usepackage{a4wide}

\begin{document}

\title{Distributed Systems}
\maketitle

\section*{Books}

\begin{itemize}

\item \textit{Distributed Computing} -- H.~Attiya and J.~Welch -- McGraw-Hill,
1998, ch. 1

\item \textit{Concurrent Systems} -- J.~Bacon -- Addison-Wesley 1994, ch. 1

\item \textit{Distributed Systems} -- Coulouris et al. -- A-W 1993, ch. 1

\item \textit{Distributed Systems} -- Mullender (ed.) -- A-W 1993, ch. 1 and 2

\item \textit{Distributed Algorithms} -- N. Lynch -- Morgan Kaufman 1996, ch. 1

\end{itemize}

\section*{Introduction}

A distributed system is a system composed of several computers that
are connected in some way and that together provide some service or
achieve some goal, e.g.

\begin{itemize}
\item communication, e.g. cellphones, email
\item process control, e.g. in aircraft
\item information processing, e.g. financial transactions
\item scientific computing, e.g. weather forecasting
\end{itemize}

The hardware of a distributed system may for instance be a VLSI chip,
a shared-memory multiprocessor machine, a LAN of workstations or PCs,
or a global network.

In this course we study some ideas, concepts and techniques that are
useful in understading, designing and implementing distributed
systems.

\section{A Mathematical Model}

We begin by describing a mathematical model that can be used to
express many distributed systems.

\medskip

Consider a finite connected graph $(\b{P}, \b{L})$. We call the nodes
\emph{processes} and the edges \emph{links}. A link makes possible
communication of data from a set \b{D} between the processes that it
joins.

\textbf{Example: } Suppose the graph has 12 processes $P_0,\ldots
,P_{11}$ connected in a ring, i.e. $\b{P} = \{P_0,\ldots ,P_{11}\}$ and
$\b{L} = \{(P_i, P_{i+1}) | 0 \leq i < 12\}$ where arithmetic is mod
12. Let $\b{D} = \b{N} \cup \{$done$\}$.

\medskip
[Diagram goes here]
\medskip

The set \b{M} of messages is \b{P \times D \times P}.

\textbf{Example: }
The message $(P_3, $done$, P_4)$ is the datum \emph{done} set from
$P_3$ to $P_4$.

\medskip

The \emph{state} of a process at a given moment is determined by the
control state it is in, the data it is storing, and the arrived
messages that have been delivered to it but not examined by it.

\textbf{Example: } Suppose each $P_i$ has an associated name $n_i \in
\b{N}$. Let the set \b{S} of states be $\b{Q} \times \b{N} \times
\cal{F}(\b{M})$ where \b{Q} = \{init, active, leader, follower\}
and $\cal{F}(\b{M})$ is the set of all finite multisets\footnote{I
think a multiset (a.k.a. a bag) is like a normal set, but you can have
multiple copies of the same thing in it.} of messages. A state of
$P_2$ might be (active, 20, $\{(P_1, 0, P_2), (P_1, 10, P_2)\})$.

\medskip

A \emph{configuration} of a distributed system describes the state of
each of its processes and the collection of messages that have been
sent but not delivered.

\textbf{Example: }The set \b{C} of configurations is $\b{S}^{12}
\times \cal{F}(\b{M})$.

\medskip

Each process has a \emph{transition function} that determines what, if
anything, it may do in a given state. Transition functions are
sometimes described explicitly, as in the next example, and sometimes
implicitly, by some text, pseudo-code, or program.

\medskip

\textbf{Example: } The transition function of $P_i$, for $0 \leq i <
12$, is ${\delta}_i : \b{S \to S} \times \cal{F}(\b{M})$ defined
by:

\begin{tabbing}
${\delta}_i$(init, n, in) = ((active, $n$, in), $\{(P_i, n,
P_{i+1})\})$ \\

${\delta}_i(q, n, $in$) = $\=$ ((q, n, \emptyset), \emptyset)$ \\

\> if $q$=leader or $q$=follower or ($q$=active and in=$\emptyset$) \\

${\delta}_i($active$, n, $in$) =$ \\
wibble \= \+ \kill
if done $\in$ data(in) then $(($follower$, n, \emptyset), \{(P_i,
$done$, P_{i+1})\})$ \\
else if $n \in $data(in) then $(($leader$, n, \emptyset),\{(P_i,
$done$, P_{i+1})\})$ \\
else if $n > $max(in) then $(($active$, n, \emptyset), \emptyset)$ \\
else if $n < $max(in) then $(($active$, n, \emptyset, \{(P_i,
$max(in)$, P_{i+1})\})$ \\
jam \= \+ \kill
where \=data(in) is the set of data in the message set \emph{in}, \\
and \> max(in) is the largest integer in that set (and in $\neq
\emptyset$) \\
\end{tabbing}

\pagebreak

Very rough sketch of ${\delta}_i$:
\begin{verbatim}
                        /~\
                        | | 
                        | v 
              init -> active -> leader <-\
                         |            \__/
                         v
                      follower <-\            [mmmm, ASCII]
                              \__/
\end{verbatim}

There are two kinds of event:

\begin{itemize}
\item \emph{Computation events}, where a process makes a transition
\item \emph{Delivery events}, where a message is delived to a process
\end{itemize}

If an event $E$ is possible in a configuration $C$, it may occur and
result in a configuration $C'$: $C \stackrel{e}{\longrightarrow} C'$.

\subsubsection*{Example}
\begin{enumerate}

\item Suppose $C = (S_0,\ldots,S_{11},U)$ and ${\delta}_i(S_i) = (S_i',
\mi{out})$.
Then $C \event{\mi{comp}(i)} C' = (S_0, \ldots, S_i', \ldots,
S_{11}, U \cup \mi{out})$.

\item Suppose $C = (S_0, \ldots, S_{11}, U \cup \{m\})$ where $m =
(P_j, d, P_i)$. Then $C \event{del(m)} C' = (S_0, \ldots, S_i, \ldots,
S_{11}, U)$ where if $S_i = (q, n, \mi{in})$ then $S_i' = (q, n, in \cup
\{m\})$.

\end{enumerate}

\bigskip

\b{Q} has a nonempty subset $\b{Q}_{\mi{init}}$ of initial control
states, and a subset $\b{Q}_{\mi{final}}$ of final control states. A
state is \emph{initial} (\emph{final}) if its control state is. A
configuration is \emph{initial} (\emph{final}) if the state of each of
its processes is.

\textbf{Example: } $\b{Q}_{\mi{init}} = \{\mi{init}\}$ and
$\b{Q}_{\mi{final}} = \{\mi{leader}, \mi{follower}\}$.

\noindent Also, $\b{S}_{\mi{init}} = \b{Q}_{\mi{init}} \times \b{N} \times
\{\emptyset\}$ and $\b{S}_{\mi{final}} = \b{Q}_{\mi{final}} \times \b{N}
\times \cal{F}(\b{M})$.

\noindent Also, $\b{C}_{\mi{init}} = (\b{S}_{\mi{init}})^{12} \times
\cal{F}(\b{M})$ and $\b{C}_{\mi{final}} = (\b{S}_{\mi{final}})^{12}
\times \cal{F}(\b{M})$.

\bigskip

In general, if $\delta$ is a transition function, $S$ is a final
state, and $\delta(S) = (S', \mi{out})$, then $S'$ is final. Hence, if
C is final and $C \event{e} C'$, then $C'$ is final.

An \emph{evolution} of a distributed system described by a
configuration $C_0$ is of the form $C_0 \event{e_0} C_1 \event{e_1}
C_2 \event{e_2} \ldots$. An evolution $C_0 \event{e_0} C_1 \event{e_1}
\ldots$ is \emph{terminating} is there exists $j$ such that $C_j$ is
final.

If $C_0$ describes an \emph{asynchronous} system then an evolution
$C_0 \event{e_0} C_1 \event{e_1} \ldots$ is \emph{admissible} if:

\begin{enumerate}
\item For each process $P_i$ there are infinitely many $j$ such that
$e_j = \mi{comp}(i)$.
\item If $e_j = \mi{comp}(i)$ and $m$ is created in $e_j$, then $e_k =
\mi{del}(m)$ for some $k$.
\end{enumerate}

In an admissible evolution of an asynchronous system, each process
performs infinitely many transitions, and every message that is created
is delivered. There need be no finite upper bound on the number of
events between two transitions of a process or between the creation
and the delivery of a message.

\bigskip

If $C_0$ describes a \emph{synchronous} system then an evolution $C_0
\event{e_0} C_1 \event{e_1} \ldots$ is \emph{admissible} if it
consists of a sequence of \emph{rounds}. In a round,

\begin{enumerate}
\item all outstanding messages are delivered, and then
\item each process performs a computation step which results in at
most one message being created, addressed to any given process.
\end{enumerate}

There is a fixed bound on the number of events between the creation
and delivery of a message.

\section{Leader Election}

The \emph{leader election problem} is to design a process that stores
just two data -- its name $n:\b{N}$ and its status $s:\{\mi{null},
\mi{leader}, \mi{follower}\}$ -- so that for any $k>1$, if $S$ is a
system consisting of $k$ copies of $P$ -- that may differ only in
storing different names -- connected in a ring, and in which there are
initially no undelivered messages, then every admissible evolution of
$S$ contains a final configuration in which the status of one $P$ is
$\mi{leader}$, and the status of every other $P$ is $\mi{follower}$.

\subsection*{A. The synchronous case}

Fix $k>1$. Choose distinct $n_0, \ldots, n_{k-1}$. P has variables
$s$, initially $\mi{null}$, and $n$, initially $n_i$ in $P_i$. $P$'s
transitions are described as follows, where $c$ refers to the
clockwise neighbour:

\begin{eqnarray*}
\mi{init} & = & c!n \to \mi{active} \\
\mi{leader} & = & ?\mi{in} \to \mi{leader} \\
\mi{follower} & = & ?\mi{in} \to \mi{follower} \\
\mi{active} & = & ?\mi{in} \to \mi{IF} ( \\
& & \mi{done} \in \mi{data}(\mi{in}): s:=\mi{follower} \to c?\mi{done}
\to \mi{follower} \\
& & n \in \mi{data}(\mi{in}) : s:= \mi{leader} \to c!\mi{done} \to
leader \\
& & n < \max(\mi{in}) : c!\max(\mi{in}) \to \mi{active} \\
& & n > \max(\mi{in}) : \mi{active})
\end{eqnarray*}

[Notes: (1) The IF construct means ``if\ldots then\ldots elsif\ldots
then\ldots elsif\ldots then\ldots'' and so on. (2) An empty set
doesn't actually have a maximum, to the IF wouldn't appear to cover
the case where $\mi{in}=\emptyset$. It works (I think) if you pretend
that $\max(\emptyset) = -1$, or that the last branch will always be
executed if $\mi{in}=\emptyset$.]

Let $n* = \max{n_0, \ldots, n_{k-1}}$, and suppose $n* = n_m$.

{
\renewcommand{\labelenumi}{(\alph{enumi})}
\renewcommand{\labelenumii}{(\roman{enumii})}

\subsubsection*{Lemma 1} Suppose $0 \le r < k$.
\begin{enumerate}
 \item In round $r$,
 \begin{enumerate}
  \item $P_{m+r}$ sends $n*$
  \item If $j \ne m+r$ then $P_j$ does not send $n*$
  \item No $P_j$ sends $\mi{done}$
  \item If $j \ne m$ then no $P_i$ with $i \in [m\ldots j-1]$ sends $j$
 \end{enumerate}
\item
 \begin{enumerate}
  \item $P_j$ is in control state $\mi{active}$.
  \item $s_j = \mi{null}$
 \end{enumerate}
\end{enumerate}

\subsubsection*{Lemma 2} Suppose $0 \le r < k$.
\begin{enumerate}
 \item In round $k+r$, $P_{m+r}$ sends $\mi{done}$.
 \item At the end of round $k+r$,
 \begin{enumerate}
  \item $P_m$ is in control state $\mi{leader}$ and $s_m = \mi{leader}$
  \item If $j \in (m,m+r]$ then $P_j$ is in control state $\mi{follower}$
        and $s_j = \mi{follower}$
  \item If $j \in (m+r, m)$ then $P_j$ is in control state $\mi{active}$ and 
        $s_j = \mi{null}$
 \end{enumerate}
\end{enumerate}

\subsubsection*{Corollary 3} At the end of round $2k-1$,
\begin{enumerate}
\item $s_m = \mi{leader}$
\item If $j \ne m$ then $s_j = \mi{follower}$
\end{enumerate}

\subsubsection*{Message complexity}

The number of messages created in the first $2k$ rounds is $O(k^2)$.

\subsection*{B. The asynchronous case}

Consider the system $S$, but now viewed as being asynchronous.

\subsubsection*{Lemma 4}

Suppose $C$ is a configuration in an admissible evolution of $S$.

\begin{enumerate}
\item $P_j$ is $\mi{leader}$ iff $s_j = \mi{leader}$.
\item $P_j$ is $\mi{follower}$ iff $s_j = \mi{follower}$.
\item $P_j$ is $\mi{init}$ or $\mi{active}$ iff $s_j = \mi{null}$.
\item If $C$ contains $(P_i, n*, P_{i+1})$ then $P_m$ is active and
$s_j = \mi{null}$ for $j \ne m$.
\item $C$ contains at most one message of the form $(P_i, n*,
P_{i+1})$.
\item If $j \ne m$ and $i \in [m \ldots j-1]$, then $C$ does not
contain $(P_i, n_j, P_{i+1})$.
\item If $j \ne m$ then $s_j \ne leader$.
\item If $C$ contains $(P_{m+j}, \mi{done}, P_{m+j+1})$, where $d \le
j < k$, then $s_m = \mi{leader}$ and $s_i = \mi{follower}$ for $i \in
(m, m+j]$ and $P_i$ is $\mi{active}$ for $i \in (m+j, m)$.
\item $C$ contains at most one message of the form $(P_i, \mi{done},
P_{i+1})$.
\end{enumerate}

\subsubsection*{Lemma 5}

Suppose $C_0 \event{e_0} C_1 \event{e_1} \ldots$ is an admissible
evolution of $S$.

\begin{enumerate}
\item If $0 \le j<k$ then some $C_h$ contains an undelivered message
$(P_{m+j}, n*, P_{m+j+1})$.
\item If $o \le j<k$ then in some $C_h$, $s_m = \mi{leader}$ and $s_i
= \mi{follower}$ for $i \in (m, m+j]$ and there is an undelivered
message $(P_{m+j}, \mi{done}, P_{m+j+1})$
\end{enumerate}

}

\subsubsection*{Corollary 6}

Every admissible evolution of $S$ contains a configuration in which
$s_m = \mi{leader}$ and $s_j = \mi{follower}$ for $j \ne m$.

\medskip

Further reading on Leader Election: Lynch ch. 3 and 15, Attiya \&
Welch ch. 3

\section{Logical clocks and vector clocks}

Suppose $C_0 \event{e_0} C_1 \event{e_1} \ldots$ is an evolution of an
asynchronous system $S$ consisting of $n$ processes $P_1, \ldots,
P_n$. Assume that $e_i \ne e_j$ if $i \ne j$ and that no two messages
created in the evolution are identical. Let $e^i_0, e^i_1, \ldots$ be
the events performed by $P_i$ (in order of occurrence in the
evolution).

\subsubsection*{Definition}
The \emph{happens before} relation is the smallest transitive relation
$\to$ such that:
\begin{enumerate}
\item $e^i_j \to e^i_{j+1}$ for all $i, j$, and
\item if $m$ is created in $e$ and consumed in $e'$ then $e \to e'$.
\end{enumerate}

\textbf{References} Lynch 18, A\&W 6, Coulouris 10, Mullender 4

We derive an evolution of a system $S'$. Each $P_i'$ has a logical
clock, and integer variable $c_i$, initially 0. If $P_i$ performs
$e_j$ then $P_i'$ performs $e_j'$ by carrying out the non-sending
actions of $E_j$, then executing:
 $c_i := 1 + \max\{c_i,
\max\{t | (P_j', (d,t), P_i')
$ is consumed in $ e_j'\}\}$, and then carrying out the sending
actions of $e_j$, but sending $(d, c_i$ instead of $d$.

$\lambda(e_j)$ is defined to be the value of $c_i$ when execution of
$e_j'$ finishes.

\subsubsection*{Lemma 7}
If $e \to f$ is in the evolution of $S$ then $\lambda(e) <
\lambda(f)$. The converse does not hold, however.

\textbf{Example} (Send-Consume Diagram)

\begin{figure}[h]
\setlength{\unitlength}{3200sp}%
%
\begingroup\makeatletter\ifx\SetFigFont\undefined%
\gdef\SetFigFont#1#2#3#4#5{%
  \reset@font\fontsize{#1}{#2pt}%
  \fontfamily{#3}\fontseries{#4}\fontshape{#5}%
  \selectfont}%
\fi\endgroup%
\begin{picture}(7147,2085)(76,-3961)
\thicklines
\put(1201,-2161){\circle*{150}}
\put(1801,-3661){\circle*{150}}
\put(2101,-2161){\circle*{150}}
\put(2401,-3661){\circle*{150}}
\put(2701,-3661){\circle*{150}}
\put(3301,-2161){\circle*{150}}
\put(3901,-2161){\circle*{150}}
\put(4501,-2161){\circle*{150}}
\put(6301,-3661){\circle*{150}}
\put(5326,-3661){\circle*{150}}
\put(5776,-2161){\circle*{150}}
\put(601,-2161){\line( 1, 0){6600}}
\put(601,-3661){\line( 1, 0){6600}}
\put(2401,-3661){\vector( 1, 1){1425}}
\put(1276,-2236){\vector( 1,-3){450}}
\put(2776,-3586){\vector( 1, 3){450}}
\put(3976,-2236){\vector( 1,-1){1275}}
\put(4426,-2011){\makebox(0,0)[lb]{\smash{\SetFigFont{12}{14.4}{\rmdefault}{\mddefault}{\updefault}7}}}
\put(5701,-2011){\makebox(0,0)[lb]{\smash{\SetFigFont{12}{14.4}{\rmdefault}{\mddefault}{\updefault}8}}}
\put(1726,-3961){\makebox(0,0)[lb]{\smash{\SetFigFont{12}{14.4}{\rmdefault}{\mddefault}{\updefault}2}}}
\put(2326,-3961){\makebox(0,0)[lb]{\smash{\SetFigFont{12}{14.4}{\rmdefault}{\mddefault}{\updefault}3}}}
\put(2626,-3961){\makebox(0,0)[lb]{\smash{\SetFigFont{12}{14.4}{\rmdefault}{\mddefault}{\updefault}4}}}
\put(1126,-2011){\makebox(0,0)[lb]{\smash{\SetFigFont{12}{14.4}{\rmdefault}{\mddefault}{\updefault}1}}}
\put(2026,-2011){\makebox(0,0)[lb]{\smash{\SetFigFont{12}{14.4}{\rmdefault}{\mddefault}{\updefault}2}}}
\put(3226,-2011){\makebox(0,0)[lb]{\smash{\SetFigFont{12}{14.4}{\rmdefault}{\mddefault}{\updefault}5}}}
\put(3826,-2011){\makebox(0,0)[lb]{\smash{\SetFigFont{12}{14.4}{\rmdefault}{\mddefault}{\updefault}6}}}
\put(5251,-3961){\makebox(0,0)[lb]{\smash{\SetFigFont{12}{14.4}{\rmdefault}{\mddefault}{\updefault}7}}}
\put(6301,-3961){\makebox(0,0)[lb]{\smash{\SetFigFont{12}{14.4}{\rmdefault}{\mddefault}{\updefault}8}}}
\put(2101,-2461){\makebox(0,0)[lb]{\smash{\SetFigFont{12}{14.4}{\rmdefault}{\mddefault}{\updefault}$e$}}}
\put(2251,-3511){\makebox(0,0)[lb]{\smash{\SetFigFont{12}{14.4}{\rmdefault}{\mddefault}{\updefault}$f$}}}
\put(296,-2236){\makebox(0,0)[lb]{\smash{\SetFigFont{12}{14.4}{\rmdefault}{\mddefault}{\updefault}$P_1$}}}
\put(296,-3736){\makebox(0,0)[lb]{\smash{\SetFigFont{12}{14.4}{\rmdefault}{\mddefault}{\updefault}$P_2$}}}
\end{picture}
\end{figure}

We derive an evolution of a system $S''$. Each $P_i''$ has a vector
clock, an array variable $v_i[1..n]$, initially $v_i[j]=0$ for $1 \le
j \le n$. If $P$ performs $e_h$ then $P_i''$ performs $e_h''$ by
carrying out the non-sending actions of $e_h$, then executing
\begin{eqnarray*}
v_i[i] &:=& v_i[i] + 1 \quad\mbox{and for}\quad j \ne i,\\
v_i[j] &:=& \max\{v_i[j],
  \max\{t_j | (P_k'',(d,\langle t_1..t_n\rangle),P_i'')
  \;\mbox{is consumed in}\; e_h''\}\}
\end{eqnarray*}

and then carrying out the sending actions of $e_h$, but sending
$(d,v_i)$, instead of $d$. $v(e_h)$ is defined to be the value of
$v_i$ when execution of $e_h''$ finishes.

\textbf{Note: } In any configuration of the evolution of $S''$,
$v_j[i] \le v_i[i]$.

Define $v \le w$ if $v[i] \le w[i] \;\forall i$, and $v<w$ if $v \le w$
and $v \ne w$. Also, $v$ and $w$ are \emph{incomparable} if $v \not\le
w$ and $w \not\le v$.

\subsubsection*{Lemma 8}
$e \to f$ is the evolution of $S$ iff $v(e)<v(f)$.

\noindent\textbf{Proof} (sketch)
\begin{itemize}
\item[$\Rightarrow$] From the definitions.
\item[$\Leftarrow$] Suppose $e \not\to f$ where $P_i$ performs $e$ and
$P_j$ performs $f$. Then $v(f)[i]<v(e)[i]$ so $v(e) \not\le v(f)$.
\end{itemize}

\subsubsection*{Corollary 9}
$e$ and $f$ are concurrent, i.e. $e \not\to f$ and $f \not\to e$, iff
$v(e)$ and $v(f)$ are incomparable.

\medskip

Suppose the links between the processes of $S$ are FIFO queues. For
convenience, assume that when an event involves the creation of a
message $m$ from $P_i$ to $P_j$, it also involves the delivery of $m$
to the FIFO queue $q_{ij}$, from which $P_j$ can consume messages.

A \emph{cut} is a (finite) subset of $\{e_0,e_1,\ldots\}$ that is the
union of the events in finite initial segments
$e_0^i,,\ldots,e^i_{s(i)}$ for $1 \le i \le n$. A cut determines a
configuration: each $P_i$ is in the state after $e^i_{s(i)}$ has
occurred. A cut $K$ is consistent if $e \in K$ and $e' \to e$ implies
$e' \in K$.

\medskip

\begin{figure}[h]
\textbf{Example}

\setlength{\unitlength}{3158sp}%
%
\begingroup\makeatletter\ifx\SetFigFont\undefined%
\gdef\SetFigFont#1#2#3#4#5{%
  \reset@font\fontsize{#1}{#2pt}%
  \fontfamily{#3}\fontseries{#4}\fontshape{#5}%
  \selectfont}%
\fi\endgroup%
\begin{picture}(8422,2562)(601,-3811)
\thicklines
\put(1501,-1561){\circle*{150}}
\put(2251,-3061){\circle*{150}}
\put(2851,-1561){\circle*{150}}
\put(3751,-3061){\circle*{150}}
\put(4201,-3061){\circle*{150}}
\put(6901,-3061){\circle*{150}}
\put(7651,-1561){\circle*{150}}
\put(8251,-3061){\circle*{150}}
\put(4801,-1561){\circle*{150}}
\put(5926,-1561){\circle*{150}}
\put(5326,-1561){\circle*{150}}
\put(901,-1561){\line( 1, 0){8100}}
\put(901,-3061){\line( 1, 0){8100}}
\put(1501,-1636){\vector( 1,-2){675}}
\put(3751,-3061){\vector( 3, 2){2025}}
\put(4276,-2986){\vector( 1, 3){450}}
\put(5476,-1636){\vector( 1,-1){1350}}
\thinlines
\put(4051,-3361){\line( 0, 1){825}}
\put(4051,-2536){\line( 3, 2){900}}
\put(4951,-1936){\line( 1, 4){150}}
\put(5551,-3361){\line( 0, 1){2100}}
\put(3976,-3586){\makebox(0,0)[lb]{\smash{\SetFigFont{10}{12.0}{\rmdefault}{\mddefault}{\updefault}cut 1}}}
\put(3976,-3811){\makebox(0,0)[lb]{\smash{\SetFigFont{10}{12.0}{\rmdefault}{\mddefault}{\updefault}(inconsistent)}}}
\put(5551,-3661){\makebox(0,0)[lb]{\smash{\SetFigFont{10}{12.0}{\rmdefault}{\mddefault}{\updefault}cut 2 (consistent)}}}
\put(601,-1636){\makebox(0,0)[lb]{\smash{\SetFigFont{10}{12.0}{\rmdefault}{\mddefault}{\updefault}P\_1}}}
\put(601,-3136){\makebox(0,0)[lb]{\smash{\SetFigFont{10}{12.0}{\rmdefault}{\mddefault}{\updefault}P\_2}}}
\end{picture}

\end{figure}

A configuration is consistent if it is the configuration determined by
a consistent cut.

The \emph{Chandy, Lamport distributed snapshot algorithm} can be used
to construct a consistent configuration in evolution of an
asynchronous system.\footnote{Hooray.}

For concreteness, suppose $P_1,\ldots,P_n$ are bank processes that
send one another amounts of money from accounts they maintain. Assume
that if $P_i$ and $P_j$ are adjacent, then $P_i$ sends infinitely many
messages to $P_j$, and assume that each $P_i$ consumes from each
$q_{ki}$ infinitely often.

Introduce a \emph{monitor} process $P_0$, which is directly connected
to each $P_i$. $P_0$ acts as follows (repeatedly):

\begin{itemize}
\item send \textit{take snapshot} to each $P_i$
\item receive (\textit{take snapshot} and) state from each $P_i$, and
construct configuration.
\end{itemize}

The behaviour of $P_i (1 \le i \le n)$ is augmented as follows:

\begin{itemize}
\item $s := \mi{state}$
\item send \textit{take snapshot} to each adjacent $P_k$
\item $Q_{hi} := \langle\rangle$ (all $h\ne 0$ adjacent)
\item $\mi{status}_j := \mi{finished}$
\item $\mi{status}_k := \mi{unifinished}$ ($k\ne j$ adjacent)
\item (without performing any money-transfer events)
\item Resume executing money-transfer events.
\item On consuming $v$ from $q_{ki}$ with $\mi{status}_k =
\mi{unifinished}$, record $v$ in $Q_{ki}$.
\item On consuming \textit{take snapshot} from $q_{ki}$,
$\mi{status}_k := \mi{finished}$.
\item If every $\mi{status}_k = \mi{finished}$, send $s$ together
with all $Q_{ki}$ to $P_0$, and cease recording.
\end{itemize}

Global snapshots can be used to detect if a \emph{stable} property
holds, e.g. if there is a deadlock in the system. (Suggested reading:
Mullender ch. 4, Lynch ch. 19).

\section{Commit Protocols}

Consider a synchronous distributed database, whose component processes
can fail by stopping. To maintain consistency, no transaction can be
\emph{committed} by one process but \emph{aborted} by another. More
precisely, we consider first the \emph{two-phase commit protocol},
which ensures that if no process fails then a configuration is reached
in which every process fails then a configuration is reached in which
every process has committed or every process has aborted. One process
is deemed the \emph{coordinator}.

In phase 1, the coordinator sets its status to the appropriate one of
\textit{can commit} and \textit{must abort}, and each non-coordinator
does likewise and sends its \emph{status} to the coordinator. If the
coordinator's status is \emph{can commit} and it receives notification
from each non-coordinator that \emph{its status} is \emph{can commit},
then the decision is \emph{commit}; otherwise the decision is
\emph{abort}. In phase 2, the coordinator sens the decision to each
non-coordinator, which acts on it.

\subsubsection*{Lemma 10}
If in an admissible evolution of the two-phase commit protocol no
process fails, then a configuration is reached in which every process
has committed or every process has aborted.

\medskip

The \emph{three-phase commit protocol} modifies and extends the
two-phase commit protocol to take account of the possibility that th
coordinator may fail. It achieves the stronger property that a
configuration is reached in which every process \emph{that has not
failed} has committed or aborted, and in which it is not the case that
some process has committed and some [other] process has aborted.

Suggested reading: Lynch, section 7.


\end{document}

