Skip to content

基于Kafka的集中式sserver信息查询控制接口

YihaoPeng edited this page Jul 24, 2020 · 9 revisions

sserver新增配置

sserver新增了以下配置:

management = { 
 enabled = true; # default: true 
 kafka_brokers = "kafka:9092";
 controller_topic = "BtcManController"; 
 processor_topic = "BtcManProcessor"; 
 auto_switch_chain = false; 
}; 

其中,management_controller_topic用于发送控制指令,management_processor_topic用于接收处理结果。 我本来想用masterslave,但在国外可能不太政确,所以就另找了两个词:see_no_evil:

现有的主控程序

ChainSwitcher 是一个实现机枪池的主控程序,它会通过BtcManController定时向sserver发送当前机枪币种信息。

此外,如果有sserver重启,ChainSwitcher会通过management_processor_topic收到通知,然后立即重发当前机枪币种信息,防止刚启动的sserver没有机枪币种信息。

两者通信中采用的具体指令下面会介绍。

现有的受控程序

sserver是目前唯一的受控程序。

信道

通过 Kafka topic 传递JSON字符串。

Topic 中数据的传递方向是单向的,sserver只会向management_processor_topic发送消息,主控程序只会向management_controller_topic发送消息。

可以使用控制台生产消费命令发送指令/查看结果。 如果通过btcpool/docker/btcpool/test/btc/run-manually.sh启动测试环境,则可以使用以下命令:

# 列出相关topic
docker exec btc_kafka_1 /opt/kafka/bin/kafka-topics.sh --describe --zookeeper zookeeper:2181 | grep Man

# 查看以前的受控端消息
docker exec btc_kafka_1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic BtcManProcessor

# 查看以前的主控端消息
docker exec btc_kafka_1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic BtcManController

# 发送控制命令
docker exec -it btc_kafka_1 /opt/kafka/bin/kafka-console-producer.sh --broker-list kafka:9092 --topic BtcManController

# 查看当前的受控端消息
docker exec btc_kafka_1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic BtcManProcessor

# “交互式控制台”
broker="kafka:9092"
docker exec btc_kafka_1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server "$broker" --topic BtcManProcessor &
docker exec -it btc_kafka_1 /opt/kafka/bin/kafka-console-producer.sh --broker-list "$broker" --topic BtcManController; fg

使用kafkacat

# 安装kafkacat
sudo apt install kafkacat

# 列出相关topic
kafkacat -L -b kafka:9092 | grep -A1 Man

# 查看以前的受控端消息
kafkacat -C -b kafka:9092 -t BtcManProcessor -o beginning

# 查看以前的主控端消息
kafkacat -C -b kafka:9092 -t BtcManController -o beginning

# 发送控制命令
kafkacat -P -b kafka:9092 -t BtcManController

# 查看当前的受控端消息
kafkacat -C -b kafka:9092 -t BtcManProcessor -o end

# “交互式控制台”
broker="kafka:9092"
kafkacat -C -b "$broker" -t BtcManProcessor -o end &
kafkacat -P -b "$broker" -t BtcManController; fg

消息

sserver上线下线通知

sserver主动发送至management_processor_topic。 注意:若sserver非正常退出,可能不会有下线通知。此外,JSON实际是单行,这里美化了一下。

{
	"action": "online或offline",
	"config": {
		"chains": {
			"default": {
				"common_events_topic": "CommonEvents",
				"job_topic": "StratumJob",
				"kafka_brokers": "kafka:9092",
				"name": "default",
				"share_topic": "ShareLog",
				"single_user_id": 0,
				"solved_share_topic": "SolvedShare",
				"users_list_id_api_url": "http://userlist/userlist.php"
			}
		},
		"management": {
			"auto_switch_chain": false,
			"controller_topic": "BtcManController",
			"kafka_brokers": "kafka:9092",
			"processor_topic": "BtcManProcessor"
		},
		"sserver": {
			"default_difficulty": 1,
			"dev_fixed_difficulty": 1.0,
			"diff_adjust_period": 900,
			"enable_dev_mode": false,
			"enable_simulator": false,
			"enable_submit_invalid_block": false,
			"id": 1,
			"ip": "0.0.0.0",
			"max_difficulty": 4611686018427387904,
			"max_job_lifetime": 300,
			"min_difficulty": 1,
			"mining_notify_interval": 3,
			"multi_chains": true,
			"nicehash": {
				"forced": false,
				"min_difficulty": 1,
				"min_difficulty_zookeeper_path": ""
			},
			"port": 1293,
			"share_avg_seconds": 10,
			"shutdown_grace_period": 3600,
			"type": "BTC"
		},
		"users": {
			"auto_reg_max_pending_users": 50,
			"case_insensitive": true,
			"enable_auto_reg": false,
			"namechains_check_interval": 300,
			"strip_user_suffix": true,
			"user_suffix_separator": "_",
			"zookeeper_auto_reg_watch_dir": "",
			"zookeeper_userchain_map": ""
		}
	},
	"created_at": "2019-12-19 09:39:59",
	"host": {
		"hostname": "29098db05daf",
		"ip": {
			"eth0": ["172.20.0.14"]
		}
	},
	"server_id": 1,
	"status": {
		"chains": {
			"default": {
				"last_job_height": 0,
				"last_job_id": 0,
				"last_job_time": 0,
				"name": "default",
				"share_status": {}
			}
		},
		"connections": {
			"count": 0,
			"state": {}
		},
		"uptime": 1576748399
	},
	"type": "sserver_notify"
}

sserver状态查询请求

由主控端发送到management_controller_topic

{"id":"xxx","type":"sserver_cmd","action":"get_status"}

可使用过滤器查询特定server id

{"id":"1","filter":{"server_id":{"1":true}},"type":"sserver_cmd","action":"get_status"}
{"id":"2","filter":{"server_id":{"2":true,"3":true}},"type":"sserver_cmd","action":"get_status"}

或者排除特定server id

{"id":"3","filter":{"exclusion":true,"server_id":{"1":true}},"type":"sserver_cmd","action":"get_status"}
{"id":"4","filter":{"exclusion":true,"server_id":{"2":true,"3":true}},"type":"sserver_cmd","action":"get_status"}

过滤器

过滤器是一个以要选中的sserverid为键,以true为值的JSON对象,如{"5":true,"13":"true"}。 目前仅支持server_id过滤器,结构如下:

{...,"filter":{"server_id":{"2":true,"3":true,...}},...}

否定模式,排除指定的server_id,其他sserver会对操作做出响应:

{...,"filter":{"exclusion":true,"server_id":{"2":true,"3":true,...}},...}

任何发送到management_controller_topic的命令都可以使用过滤器。

sserver状态查询响应

sserver在收到查询请求后发送到management_processor_topic。 结构与sserver上线下线通知类似:

{
	"action": "get_status",
	"config": {
		与sserver上线下线消息一致,此处省略
	},
	"created_at": "2019-12-19 10:48:56",
	"host": {
		"hostname": "29098db05daf",
		"ip": {
			"eth0": ["172.20.0.14"]
		}
	},
	"id": "xxx",
	"server_id": 1,
	"status": {
		"chains": {
			"default": {
				"last_job_height": 214,
				"last_job_id": 6772100554530226177,
				"last_job_time": 1576752534,
				"name": "default",
				"share_status": {
					"23": 850,
					"37": 5556
				}
			}
		},
		"connections": {
			"count": 18842,
			"state": {
				"default": {
					"authenticated": 16396,
					"connected": 2446
				}
			}
		},
		"uptime": 1576748399
	},
	"type": "sserver_response"
}

若查询请求使用了过滤器,则只有server id匹配的sserver会做出响应。

id

响应的id就是对应请求的idid可以是任意类型,包括null、数组和对象。比如,以下请求合法:

{"id":{"x":1,"y":2,"d":3,"z":[4,5,6]},"filter":{"server_id":{"1":true}},"type":"sserver_cmd","action":"get_status"}

但是请注意,响应不保证对象中元素的顺序,比如响应id可能变成这样:

"id":{"d":3,"x":1,"y":2,"z":[4,5,6]}

share_status

2337是share状态的代码,以下是对照表:

status status_name
1798084231 accept
950395421 stale
1422486894 block found
1713984938 uncle block
21 job not found or stale
22 duplicate
23 low difficulty
31 time too old
32 time too new
33 wrong version
34 invalid solution
35 wrong nonce prefix
36 job not found
37 stale

列出sserver的简要信息

get_status返回的内容太多,不方便在控制台消费器中阅读。可以改用list命令,返回的内容一行可以看完。

请求

{"id":"xxx","type":"sserver_cmd","action":"list"}

响应

{"action":"list","created_at":"2019-12-19 12:06:42","host":{"hostname":"55db881a9aa2","ip":{"eth0":["172.22.0.15"]}},"id":"xxx","server_id":1,"status":{"connections":{"count":0},"uptime":1576757064},"type":"sserver_response"}
{"action":"list","created_at":"2019-12-19 12:06:42","host":{"hostname":"d3d666b5295c","ip":{"eth0":["172.22.0.10"]}},"id":"xxx","server_id":2,"status":{"connections":{"count":0},"uptime":1576757063},"type":"sserver_response"}
{"action":"list","created_at":"2019-12-19 12:06:42","host":{"hostname":"b829bd910d9f","ip":{"eth0":["172.22.0.14"]}},"id":"xxx","server_id":3,"status":{"connections":{"count":0},"uptime":1576757063},"type":"sserver_response"}
{"action":"list","created_at":"2019-12-19 12:06:42","host":{"hostname":"19890ab9ad71","ip":{"eth0":["172.22.0.16"]}},"id":"xxx","server_id":4,"status":{"connections":{"count":0},"uptime":1576757063},"type":"sserver_response"}

机枪池币种切换请求

由主控端发送到management_controller_topic

主控端ChainSwitcher会在这些情况下发送切换请求:

  1. 轮询发现另一个币种收益更高。
  2. 即使收益最高的币种没有变化,每隔指定的秒数(默认60秒)后也会重发一次,以保持心跳(也防止某些sserver错过之前的消息)。
  3. management_processor_topic收到sserveronline消息后会立即重发一次,让刚启动的sserver马上获取机枪币种。

请求消息:

{"id":1,"type":"sserver_cmd","action":"auto_switch_chain","chain_name":"btc"}
{"id":2,"type":"sserver_cmd","action":"auto_switch_chain","chain_name":"bcc"}

可带过滤器,只切换特定sserver的币种:

{"id":3,"filter":{"server_id":{"12":true}},"type":"sserver_cmd","action":"auto_switch_chain","chain_name":"btc"}
{"id":4,"filter":{"server_id":{"3":true,"4":true}},"type":"sserver_cmd","action":"auto_switch_chain","chain_name":"bcc"}

注意,因为ChainSwitcher切换程序发送的切换请求不带任何过滤器,会匹配所有sserver,所以一但ChainSwitcher发送定时切换请求,币种就会被切回去。若要实现按sserver切换币种并长久保持,需要修改ChainSwitcher切换程序,让它附带对应的过滤器。

机枪切换响应

sserver在收到切换请求并实际完成切换后发送到management_processor_topic。注意,如果实际币种没有变化(比如切换前后均为BTC),或者该sserver没有机枪用户,则不会有响应。

和一键切换的兼容性:只有当前币种为"auto"的用户会被切到新币种,当前币种为"btc"等具体币种的用户不会切换。

实际是单行,这里美化了一下:

{
	"action": "auto_switch_chain",
	"created_at": "2019-10-28 23:19:09",
	"id": 1,
	"new_chain_name": "btc",
	"old_chain_name": "bch",
	"result": true,
	"server_id": 1,
	"switched_connections": 11,
	"switched_users": 2,
	"type": "sserver_response"
}

sserver异常

sserver捕获到异常,会主动将其内容发送至management_processor_topic

注意:若sserver在发生异常后马上崩溃,则异常通知消息无法发出。

消息形如:

{
	"action": "exception",
	"config": {
		与sserver上线下线消息一致,此处省略
	},
	"created_at": "2019-12-19 10:23:34",
	"exception": {
		"what": "[json.exception.parse_error.101] parse error at line 1, column 26: syntax error while parsing object key - unexpected '['; expected string literal"
	},
	"host": {
		"hostname": "e2a121244c2f",
		"ip": {
			"eth0": ["172.20.0.10"]
		}
	},
	"server_id": 1,
	"status": {
		与sserver上线下线消息一致,此处省略
	},
	"type": "sserver_notify"
}