1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| func watch(cli *clientv3.Client) { kv := clientv3.NewKV(cli)
go func() { for { _, _ = kv.Put(context.TODO(), "/language", "go") _, _ = kv.Delete(context.TODO(), "language") time.Sleep(time.Second) } }()
getResp, err := kv.Get(context.TODO(), "language") if err != nil { log.Fatal(err) }
for _, ev := range getResp.Kvs { fmt.Printf("Get %s: %s\n", ev.Key, ev.Value) }
watchStartVersion := getResp.Header.Revision + 1 fmt.Printf("Start watching from version: %d\n", watchStartVersion)
watcher := clientv3.NewWatcher(cli)
ctx, cancel := context.WithCancel(context.TODO()) time.AfterFunc(5*time.Second, func() { cancel() })
watchRespChan := watcher.Watch(ctx, "language", clientv3.WithRev(watchStartVersion)) for watchResp := range watchRespChan { for _, event := range watchResp.Events { switch event.Type { case mvccpb.PUT: fmt.Printf("Modify: %s, %v, %v\n", event.Kv.Value, event.Kv.CreateRevision, event.Kv.ModRevision)
case mvccpb.DELETE: fmt.Printf("Delete: %v\n", event.Kv.ModRevision) } } } }
|