Administrator
Administrator
Published on 2024-12-14 / 24 Visits
0
0

etcd

还是和zk一样只使用它的分布式锁 嘿嘿.....

etcd分布式锁的原理就是新增键值对。

ttl设置5秒并不是最大持有锁5秒;实测效果:如果etcd客户端没有断开连接,也没有执行释放,那么锁会一直持有;因为锁绑定一个租约(lease),租约的时间是设定的ttl时间,租约会自动续期。释放时,会移除lease并移除相关键值对

下载&运行

github链接

在release中找到自己系统对应的文件,下载完成后解压到文件夹中

直接运行etcd.exe,服务就运行起来了,默认端口2379

配置本机(可选)

创建conf.yaml

name: default #单机时随便写就好了
data-dir: ./data #存放数据目录(windows中的相对路径)
listen-client-urls: http://0.0.0.0:4001 #提供给客户端连接的端口
advertise-client-urls: http://127.0.0.1:4001 #可以用此URL在浏览器访问和使用RESTFUL Api

用配置文件运行

etcd.exe --config-file conf.yaml

RESTFul API

键名和键值都须用BASE64编码

作用

path

method

body

基础API

查看版本

/version

GET

查看健康状态

/health

GET

键值对API

存储或更新键值对

/v3/kv/put

POST

{"key": "Zm9v","value": "YmFy"}

查询键值对

/v3/kv/range

POST

{"key": "Zm9v"}

删除键值对

/v3/kv/deleterange

POST

{"key": "Zm9v"}

租约API

创建租约

/v3/lease/grant

POST

{"TTL":10}

//单位 秒

租约续期

/v3/lease/keepalive

POST

{"ID":12345678912345}

ID为创建时返回的ID

golang中使用

go get go.etcd.io/etcd/client/v3
package main

import (
	"context"
	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/concurrency"
	"log"
	"time"
)

func main() {
	// 创建 etcd 客户端
	client, err := clientv3.New(clientv3.Config{
		Endpoints:            []string{"localhost:4001"}, // etcd 的地址
		DialTimeout:          5 * time.Second,
		DialKeepAliveTime:    5 * time.Second,
		DialKeepAliveTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatalf("Failed to connect to etcd: %v", err)
	}
	defer client.Close()
	println("连接成功")

	for i := 0; i < 10; i++ {

		func() {
			session, err := concurrency.NewSession(client, concurrency.WithTTL(8)) // TTL 是锁的超时时间
			if err != nil {
				log.Fatalf("Failed to create etcd session: %v", err)
			}
			var t = time.Now().UnixMilli()
			println(i, "开始获取锁", t)
			lock := concurrency.NewMutex(session, "/my-lock/") // "/my-lock/" 是锁的键路径
			if err := lock.Lock(context.Background()); err != nil {
				println("获取锁失败", i, t)
				return
			}
			println("获取锁成功", i, t)
			defer session.Close()
			time.Sleep(4 * time.Second)
			println("已等待4秒", i, t)

			//if err := lock.Unlock(context.Background()); err != nil {
			//	fmt.Println("锁释放失败", i, err)
			//}
			//fmt.Println("锁释放成功", i)
		}()
	}
	time.Sleep(20 * time.Second)
}

node.js中使用

node.js官方文档

const { Etcd3 } = require('etcd3');
const client = new Etcd3({ hosts: 'http://localhost:4001' });



async function distributedLockExample(i) {
    const lock = client.lock("lock-name")
    lock.ttl(5) //5秒,默认30秒

    
    try {
        var ret
        ret = await lock.acquire(); //如果无法取得,会得到错误进入catch代码块

        console.log(`已取得锁`, i, await ret.leaseId());

        await new Promise(resolve => setTimeout(resolve, 2000)); // 模拟耗时任务

        console.log('任务结束', i)
    } catch (err) {
        console.log('取锁失败', i, t);
    } finally {
        await lock.release() //释放锁
    }
}


distributedLockExample(0).catch(console.error);


Comment