golang[109]-lab-模拟rpc远程过程调用

为了实践分布式的第一步,需要模拟远程的过程调用,就好像程序是部署在不同的机器通过tcp进行连接。

构建client

1
2
3
4
type ClientEnd struct {
endname interface{} // this end-point's name
ch chan reqMsg // copy of Network.endCh
}

endname 代表client的名字
ch 与network共享。用于传递请求参数:

1
2
3
4
5
6
7
type reqMsg struct {
endname interface{} // name of sending ClientEnd
svcMeth string // e.g. "Raft.AppendEntries"
argsType reflect.Type
args []byte
replyCh chan replyMsg
}

svcMeth 找到要请求的service,以及调用的方法
argsType 存储函数请求的类型,例如要远程调用的函数是,
func (js *JunkServer) Handler4(args *JunkArgs, reply *JunkReply) {
reply.X = “pointer”
}
那么argsType存储的是*JunkArgs
args 代表参数序列化后的字节数组
replych 接受返回字节数组。

1
2
3
4
type replyMsg struct {
ok bool
reply []byte
}

client调用

模拟客户端远程过程调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (e *ClientEnd) Call(svcMeth string, args interface{}, reply interface{}) bool {
req := reqMsg{}
req.endname = e.endname
req.svcMeth = svcMeth
req.argsType = reflect.TypeOf(args)
req.replyCh = make(chan replyMsg)

// serialize
qb := new(bytes.Buffer)
qe := gob.NewEncoder(qb)
qe.Encode(args)
req.args = qb.Bytes()

// request param send to network
e.ch <- req

rep := <-req.replyCh
if rep.ok {
// unserialize
rb := bytes.NewBuffer(rep.reply)
rd := gob.NewDecoder(rb)
if err := rd.Decode(reply); err != nil {
log.Fatalf("ClientEnd.Call(): decode reply: %v\n", err)
}
return true
} else {
return false
}
}

build network

1
2
3
4
5
6
7
8
9
10
11
type Network struct {
mu sync.Mutex
reliable bool
longDelays bool // pause a long time on send on disabled connection
longReordering bool // sometimes delay replies a long time
ends map[interface{}]*ClientEnd // ends, by name
enabled map[interface{}]bool // by end name
servers map[interface{}]*Server // servers, by name
connections map[interface{}]interface{} // endname -> servername
endCh chan reqMsg
}

reliable 模拟网络的可靠性
longDelays 模拟网络延迟
enabled 网络是否可用
endCh 用于接收client的请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func MakeNetwork() *Network {
rn := &Network{}
rn.reliable = true
rn.ends = map[interface{}]*ClientEnd{}
rn.enabled = map[interface{}]bool{}
rn.servers = map[interface{}]*Server{}
rn.connections = map[interface{}](interface{}){}
rn.endCh = make(chan reqMsg)

// single goroutine to handle all ClientEnd.Call()s
go func() {
for xreq := range rn.endCh {
go rn.ProcessReq(xreq)
}
}()

return rn
}

network添加server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

func (rn *Network) AddServer(servername interface{}, rs *Server) {
rn.mu.Lock()
defer rn.mu.Unlock()

rn.servers[servername] = rs
}


func (rn *Network) DeleteServer(servername interface{}) {
rn.mu.Lock()
defer rn.mu.Unlock()

rn.servers[servername] = nil
}

确保network 可用

1
2
3
4
5
6
7
8

// enable/disable a ClientEnd.
func (rn *Network) Enable(endname interface{}, enabled bool) {
rn.mu.Lock()
defer rn.mu.Unlock()

rn.enabled[endname] = enabled
}

network add server

1
2
3
4
5
6
func (rn *Network) AddServer(servername interface{}, rs *Server) {
rn.mu.Lock()
defer rn.mu.Unlock()

rn.servers[servername] = rs
}

network 连接client 与server

1
2
3
4
5
6
7
8
// connect a ClientEnd to a server.
// a ClientEnd can only be connected once in its lifetime.
func (rn *Network) Connect(endname interface{}, servername interface{}) {
rn.mu.Lock()
defer rn.mu.Unlock()

rn.connections[endname] = servername
}

network create client end

在这里client的创建是通过network完成的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// create a client end-point.
// start the thread that listens and delivers.
func (rn *Network) MakeEnd(endname interface{}) *ClientEnd {
rn.mu.Lock()
defer rn.mu.Unlock()

if _, ok := rn.ends[endname]; ok {
log.Fatalf("MakeEnd: %v already exists\n", endname)
}

e := &ClientEnd{}
e.endname = endname
e.ch = rn.endCh
rn.ends[endname] = e
rn.enabled[endname] = false
rn.connections[endname] = nil
return e
}

network 统计进入的RPC数量

1
2
3
4
5
6
7
8
9

// get a server's count of incoming RPCs.
func (rn *Network) GetCount(servername interface{}) int {
rn.mu.Lock()
defer rn.mu.Unlock()

svr := rn.servers[servername]
return svr.GetCount()
}

network处理client请求

ProceeReq 接受client发出的处理请求,
1、模拟网络的延迟,每次请求花费的时间
2、找到此client连接的server,调用server dispatch方法
3、等待返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81

func (rn *Network) ProcessReq(req reqMsg) {
enabled, servername, server, reliable, longreordering := rn.ReadEndnameInfo(req.endname)

if enabled && servername != nil && server != nil {
if reliable == false {
// short delay
ms := (rand.Int() % 27)
time.Sleep(time.Duration(ms) * time.Millisecond)
}

if reliable == false && (rand.Int()%1000) < 100 {
// drop the request, return as if timeout
req.replyCh <- replyMsg{false, nil}
return
}

// execute the request (call the RPC handler).
// in a separate thread so that we can periodically check
// if the server has been killed and the RPC should get a
// failure reply.
ech := make(chan replyMsg)
go func() {
r := server.dispatch(req)
ech <- r
}()

// wait for handler to return,
// but stop waiting if DeleteServer() has been called,
// and return an error.
var reply replyMsg
replyOK := false
serverDead := false
for replyOK == false && serverDead == false {
select {
case reply = <-ech:
replyOK = true
case <-time.After(100 * time.Millisecond):
serverDead = rn.IsServerDead(req.endname, servername, server)
}
}

// do not reply if DeleteServer() has been called, i.e.
// the server has been killed. this is needed to avoid
// situation in which a client gets a positive reply
// to an Append, but the server persisted the update
// into the old Persister. config.go is careful to call
// DeleteServer() before superseding the Persister.
serverDead = rn.IsServerDead(req.endname, servername, server)

if replyOK == false || serverDead == true {
// server was killed while we were waiting; return error.
req.replyCh <- replyMsg{false, nil}
} else if reliable == false && (rand.Int()%1000) < 100 {
// drop the reply, return as if timeout
req.replyCh <- replyMsg{false, nil}
} else if longreordering == true && rand.Intn(900) < 600 {
// delay the response for a while
ms := 200 + rand.Intn(1+rand.Intn(2000))
time.Sleep(time.Duration(ms) * time.Millisecond)
req.replyCh <- reply
} else {
req.replyCh <- reply
}
} else {
// simulate no reply and eventual timeout.
ms := 0
if rn.longDelays {
// let Raft tests check that leader doesn't send
// RPCs synchronously.
ms = (rand.Int() % 7000)
} else {
// many kv tests require the client to try each
// server in fairly rapid succession.
ms = (rand.Int() % 100)
}
time.Sleep(time.Duration(ms) * time.Millisecond)
req.replyCh <- replyMsg{false, nil}
}

}

build server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//
// a server is a collection of services, all sharing
// the same rpc dispatcher. so that e.g. both a Raft
// and a k/v server can listen to the same rpc endpoint.
//
type Server struct {
mu sync.Mutex
services map[string]*Service
count int // incoming RPCs
}

func MakeServer() *Server {
rs := &Server{}
rs.services = map[string]*Service{}
return rs
}

server 是由一系列的service组成的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//
// a server is a collection of services, all sharing
// the same rpc dispatcher. so that e.g. both a Raft
// and a k/v server can listen to the same rpc endpoint.
//
type Server struct {
mu sync.Mutex
services map[string]*Service
count int // incoming RPCs
}

func MakeServer() *Server {
rs := &Server{}
rs.services = map[string]*Service{}
return rs
}

server 是由一系列的service组成的

1
2
3
4
5
6

func (rs *Server) AddService(svc *Service) {
rs.mu.Lock()
defer rs.mu.Unlock()
rs.services[svc.name] = svc
}

server的一个重要方法是调用dispatch ,找到client具体是调用的哪个servise的哪个方法 eg.“Raft.AppendEntries”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

func (rs *Server) dispatch(req reqMsg) replyMsg {
rs.mu.Lock()

rs.count += 1

// split Raft.AppendEntries into service and method
dot := strings.LastIndex(req.svcMeth, ".")
serviceName := req.svcMeth[:dot]
methodName := req.svcMeth[dot+1:]

service, ok := rs.services[serviceName]

rs.mu.Unlock()

if ok {
return service.dispatch(methodName, req)
} else {
choices := []string{}
for k, _ := range rs.services {
choices = append(choices, k)
}
log.Fatalf("labrpc.Server.dispatch(): unknown service %v in %v.%v; expecting one of %v\n",
serviceName, serviceName, methodName, choices)
return replyMsg{false, nil}
}
}

build service

1
2
3
4
5
6
7
8
// an object with methods that can be called via RPC.
// a single server may have more than one Service.
type Service struct {
name string
rcvr reflect.Value
typ reflect.Type
methods map[string]reflect.Method
}

methods 存储所有方法
typ 与 rcvr 存储结构体的反射类型
name 是结构体的名字
makeServise 方法将某个结构体即service的所有方法存起来。

规定函数必须有三个参数
第1个是调用方法的结构体
第2个是request参数
第3个是reply参数
1个返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func MakeService(rcvr interface{}) *Service {
svc := &Service{}
svc.typ = reflect.TypeOf(rcvr)
svc.rcvr = reflect.ValueOf(rcvr)
svc.name = reflect.Indirect(svc.rcvr).Type().Name()
svc.methods = map[string]reflect.Method{}

for m := 0; m < svc.typ.NumMethod(); m++ {
method := svc.typ.Method(m)
mtype := method.Type
mname := method.Name

//fmt.Printf("%v pp %v ni %v 1k %v 2k %v no %v\n",
// mname, method.PkgPath, mtype.NumIn(), mtype.In(1).Kind(), mtype.In(2).Kind(), mtype.NumOut())

if method.PkgPath != "" || // capitalized?
mtype.NumIn() != 3 ||
//mtype.In(1).Kind() != reflect.Ptr ||
mtype.In(2).Kind() != reflect.Ptr ||
mtype.NumOut() != 0 {
// the method is not suitable for a handler
//fmt.Printf("bad method: %v\n", mname)
} else {
// the method looks like a handler
svc.methods[mname] = method
}
}

return svc
}

测试

用一个简单的例子来测试并说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

type JunkArgs struct {
X int
}
type JunkReply struct {
X string
}

type JunkServer struct {
mu sync.Mutex
log1 []string
log2 []int
}

// args is a pointer
func (js *JunkServer) Handler4(args *JunkArgs, reply *JunkReply) {
reply.X = "pointer"
}

// args is a not pointer
func (js *JunkServer) Handler5(args JunkArgs, reply *JunkReply) {
reply.X = "no pointer"
}

func TestTypes(t *testing.T) {
// 1. create network
rn := MakeNetwork()
// 2、create client
e := rn.MakeEnd("end1-99")
js := &JunkServer{}
// 3. create service
svc := MakeService(js)
// 4. create server
rs := MakeServer()
// 5. service add to server
rs.AddService(svc)
// 6. server add to network
rn.AddServer("server99", rs)
// 7. client connect to server
rn.Connect("end1-99", "server99")
// 8. enable network
rn.Enable("end1-99", true)

{
var args JunkArgs
var reply JunkReply
// args must match type (pointer or not) of handler.
// 9. call the service wait the reply.
e.Call("JunkServer.Handler4", &args, &reply)
if reply.X != "pointer" {
t.Fatalf("wrong reply from Handler4")
}
}

{
var args JunkArgs
var reply JunkReply
// args must match type (pointer or not) of handler.
e.Call("JunkServer.Handler5", args, &reply)
if reply.X != "no pointer" {
t.Fatalf("wrong reply from Handler5")
}
}
}

总结与使用

此package很好在本地模拟了远程过程调用,为实现分布式的算法奠定了坚实的基础。
使用此package的9个步骤:

1
2
3
4
5
6
7
8
9
   // 1. create network
// 2、create client
// 3. create service
// 4. create server
// 5. service add to server
// 6. server add to network
// 7. client connect to server
// 8. enable network
// 9. call the service wait the reply.

完整实验代码

https://github.com/dreamerjackson/golang-deep-distributed-lab
后面的测试多参考一下