Administrator
Administrator
发布于 2025-08-17 / 47 阅读
0
0

consul集群+分布式锁

操作系统: Ubuntu 24.04 LTS / CentOS 9

Consul版本: 1.24.1

ssh客户端: termius 9.28.0

(Ubuntu)安装软件-apt

https://developer.hashicorp.com/consul/install找到对应系统的安装命令

sudo ufw disable #先关闭防火墙

wget -O - https://apt.releases.hashicorp.com/gpg | sudo gpg --dearmor -o /usr/share/keyrings/hashicorp-archive-keyring.gpg
echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/hashicorp-archive-keyring.gpg] https://apt.releases.hashicorp.com $(grep -oP '(?<=UBUNTU_CODENAME=).*' /etc/os-release || lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/hashicorp.list
sudo apt update && sudo apt install consul
consul -v

配置文件目录:/etc/consul.d (下面有一个初始的consul.hcl配置文件)

systemctl的service文件所在路径:/usr/bin/systemd/system/consul.service

(CentOS系列)安装软件-yum

sudo systemctl stop firewalld #先关闭防火墙

sudo yum-config-manager --add-repo https://rpm.releases.hashicorp.com/RHEL/hashicorp.repo
sudo yum -y install consul
consul -v

配置文件

每个服务器节点在/etc/consul.d目录中添加一个JSON配置文件

第一台服务器将其中的129.211.xxx.xxx为公网IP

后续服务器除修改IP外,将bootstrap_expect这一行删除(只有第一个节点需要bootstrap mode)

{
    "server": true,
    "bootstrap_expect": 1,
    "bind_addr":"0.0.0.0",
    "client_addr":"0.0.0.0",
    "advertise_addr":"129.211.xxx.xxx",
    "data_dir":"/opt/consul",
    "ui_config":{
        "enabled": true
    }
}

放通端口

端口:8300、8301、8302、8500、8501、8600

如果是云服务器,需要注意有二处需要放通,操作系统的防火墙和云服务器的安全组。

启动

sudo systemctl enable consul
sudo systemctl start consul #如果启动长时间卡住,CTRL+C中断即可,服务其实已经启动

join到集群

第二台和第三台分别执行命令行:

consul join <第一台服务器的IP>
consul members #查看集群状态

也可以在http://<公网IP>:8500网页上查看

golang测试分布式锁-KV方式

程序会在下一个30秒或整分的时候执行

go get github.com/hashicorp/consul/api
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/hashicorp/consul/api"
)

func main() {
	var timeCur = time.Now()
	var nextMinute time.Time
	if timeCur.Second() < 30 {
		nextMinute = time.Date(timeCur.Year(), timeCur.Month(), timeCur.Day(), timeCur.Hour(), timeCur.Minute(), 30, 0, time.Local)
	} else {
		nextMinute = time.Date(timeCur.Year(), timeCur.Month(), timeCur.Day(), timeCur.Hour(), timeCur.Minute(), 59, 0, time.Local)
	}
	var duration = nextMinute.Sub(timeCur)
	fmt.Println(duration)
	time.Sleep(duration)

	// 创建 Consul 客户端配置
	config := &api.Config{
		Address: "localhost:8500", // 替换为你的 Consul 地址
	}
	client, err := api.NewClient(config)
	if err != nil {
		log.Fatalf("failed to create Consul client: %v", err)
	}

	// 获取 KV 和 Session API
	kvs := client.KV()
	session := client.Session()

	// 创建一个 session(分布式锁的核心)
	sessionID, _, err := session.Create(&api.SessionEntry{
		Behavior: api.SessionBehaviorRelease, // 释放会话后释放锁
		TTL:      "10s",                      // 会话的生存时间
	}, nil)
	if err != nil {
		log.Fatalf("failed to create session: %v", err)
	}
	defer session.Destroy(sessionID, nil)

	// 获取一个独占锁的 Key
	lockKey := "lock/key01"

	// 尝试获取锁
	lockValue := []byte("locked")
	lockPair := &api.KVPair{
		Key:     lockKey,
		Value:   lockValue,
		Session: sessionID,
	}

	// 设置分布式锁(使用 Key/Value 存储)
	b1, _, err := kvs.Acquire(lockPair, nil)
	if err != nil {
		log.Printf("failed to acquire lock: %v", err)
		return
	}
	if b1 == false {
		fmt.Println("获取锁失败")
		return
	}

	fmt.Println("Lock acquired!")

	// 模拟业务处理
	time.Sleep(5 * time.Second)

	// 释放锁
	_, _, err = kvs.Release(lockPair, nil)
	if err != nil {
		log.Printf("failed to release lock: %v", err)
		return
	}
	fmt.Println("Lock released!")
}

编绎后放到三台服务器,chmod 777 <程序名>给权限

golang使用consul仿原生锁(锁原语)

如果无法取得锁会一直等,像GOLANG原生的锁Lock和Unlock一样使用

要注意的是用LockOpts的session会有协程自动做续期(renew),如果锁不释放session一直不会过期

package main

import (
	"fmt"
	"time"

	"github.com/hashicorp/consul/api"
)

func main() {
	client, _ := api.NewClient(&api.Config{
		Address: "localhost:8500",
	})

	for i := 0; i < 3; i++ {
		go func(n int) {

			lock, _ := client.LockOpts(&api.LockOptions{
				Key:   "lock2",
				Value: []byte(fmt.Sprintf("test_lock%d", n)),
				SessionOpts: &api.SessionEntry{
					Behavior: api.SessionBehaviorDelete,
					TTL:      "15s",
				},
				LockTryOnce: false, //只尝试一次等于false
			})
			stopCh := make(chan struct{})

			_, err := lock.Lock(stopCh)
			if err != nil {
				fmt.Println(i, time.Now().Format(time.DateTime), "取锁失败", err)
				return
			}
			defer func() {
				err := lock.Unlock()
				fmt.Println(i, time.Now().Format(time.DateTime), "释放锁", err)
			}()

			fmt.Println(i, time.Now().Format(time.DateTime), "取锁成功")
			time.Sleep(time.Second * 20) //模拟业务执行
		}(i)
	}
	time.Sleep(time.Hour)
}


评论