上一篇讲到了当拆分了项目中的服务到本地后如何配置一个最简单的RPC以及RPC源码的阅读。不过既然已经开始使用远程调用了,架构肯定会向服务集群的方向发展,也就是一个服务由多个服务器,客户端需要对这些服务器进行随机或有规则的访问。
如果是http请求的话,可以使用nginx反向代理做负载均衡等方式,但是这里RPC使用的是TCP协议,并不经过nginx。
假设现在要做一个直播相关项目,因为弹幕相关功能负载非常高,所以需要对弹幕相关功能进行了服务拆分,并为了高可用给弹幕服务加装了2台服务器
[root@iZbp1acp86oa3ixxw4n1dpZ ~]# docker inspect swoole
[
{
"Name": "swoole",
......
"Containers": {
"287fefe25feebd7ebf6f058056acdfabb470a889a539c60fed202ef377c6dc11": {
"Name": "swoft_11_8020-8029",
"IPv4Address": "172.100.0.13/16",
},
"914bf1249a663f7226fac980e36a7d2dd93a3b295bc88ba27938563623fe3347": {
"Name": "swoft_11_8010-8019",
"IPv4Address": "172.100.0.12/16",
},
"d1552fc5da30f8fd3c423c4b22d4f07e1da5460419e522cffd793ac9a66ab9c1": {
"Name": "swoft_11_8000-8010",
"IPv4Address": "172.100.0.11/16",
}
},
}
]
如上,开启了3个docker集群,ip分别为172.100.0.13,172.100.0.12,172.100.0.11。
use Swoft\Rpc\Client\Client as ServiceClient;
......
'user' => [
'class' => ServiceClient::class,
'host' => '127.0.0.1',
'port' => 8001,
'setting' => [
'timeout' => 0.5,
'connect_timeout' => 1.0,
'write_timeout' => 10.0,
'read_timeout' => 0.5,
],
'packet' => bean('rpcClientPacket')
],
在配置中使用Swoft\Rpc\Client\Client as ServiceClient
作为了客户端,要修改客户端代码,做到客户端随机访问集群,只要修改Swoft\Rpc\Client\Client
后并在此即可
public function createConnection(Pool $pool): Connection
{
$connection = Connection::new($this, $pool);
$connection->create();
return $connection;
}
客户端中使用Swoft\Rpc\Client\Connection
进行了连接
/**
* @throws RpcClientException
*/
public function create(): void
{
$connection = new Client(SWOOLE_SOCK_TCP);
[$host, $port] = $this->getHostPort(); // 这里获取了ip和端口
$setting = $this->client->getSetting();
if (!empty($setting)) {
$connection->set($setting);
}
if (!$connection->connect($host, (int)$port)) {
throw new RpcClientException(
sprintf('Connect failed host=%s port=%d', $host, $port)
);
}
$this->connection = $connection;
}
......
/**
* @return array
* @throws RpcClientException
*/
private function getHostPort(): array
{
$provider = $this->client->getProvider(); // swoft定义了一个服务提供者,但是并没有用上,同时可以看到服务提供者是注入在客户端内的
if (empty($provider) || !$provider instanceof ProviderInterface) { // 如果没有服务提供者就会返回配置中配置的ip和端口
return [$this->client->getHost(), $this->client->getPort()];
}
$list = $provider->getList($this->client);
if (empty($list)) {
throw new RpcClientException(
sprintf('Provider return list can not empty!')
);
}
if (!is_array($list)) {
throw new RpcClientException(
sprintf('Provider(%s) return format is error!', JsonHelper::encode($list))
);
}
$randKey = array_rand($list, 1);
$hostPort = explode(':', $list[$randKey]);
if (count($hostPort) < 2) {
throw new RpcClientException(
sprintf('Provider(%s) return format is error!', JsonHelper::encode($hostPort))
);
}
[$host, $port] = $hostPort;
return [$host, $port];
}
从连接类里的代码可以看出来,连接类会使用客户端内的信息去做和RPC之间的连接通信,连接配置会从客户端中取。连接类会先查看客户端中是否有服务提供者$this->client->getProvider()
,没有的话会直接使用客户端配置中自己的ip和端口
首先看看client->getProvider()
/**
* @var ProviderInterface
*/
protected $provider;
/**
* @return ProviderInterface
*/
public function getProvider(): ?ProviderInterface
{
return $this->provider;
}
客户端没有进行任何针对服务提供者的操作,仅仅定义了一个服务提供接口。所以在这里只要实现服务提供者类并修改getProvider()
即可实现这篇想达到的功能,也就是客户端访问集群。那么再看看ProviderInterface
/**
* Class ProviderInterface
*
* @since 2.0
*/
interface ProviderInterface
{
/**
* @param Client $client
*
* @return array
*
* @example
* [
* 'host:port',
* 'host:port',
* 'host:port',
* ]
*/
public function getList(Client $client): array;
}
可以看到这个接口是非常简单的,只需要实现一个getList
方法,并返回host:port
数组即可
实现一个弹幕服务提供者BulletScreenProvider
class BulletScreenProvider implements ProviderInterface
{
public function getList(Client $client): array
{
return ['172.100.0.12:8011','172.100.0.13:8029'];
}
}
继承并重写客户端
use Swoft\Rpc\Client\Client;
use Swoft\Rpc\Client\Contract\ProviderInterface;
class RpcClient extends Client
{
public function getProvider(): ?ProviderInterface
{
$this->provider = new BulletScreenProvider();
return $this->provider;
}
}
修改客户端配置为新继承的客户端,为了不影响源码新加了一段配置
'bulletScreen' => [
'class' => RpcClient::class,
'host' => '127.0.0.1', // 这个ip并不会起到任何作用,因为会使用服务提供者设置的ip
'port' => 8001, // 这个port并不会起到任何作用,因为会使用服务提供者设置的port
'setting' => [
'timeout' => 0.5,
'connect_timeout' => 1.0,
'write_timeout' => 10.0,
'read_timeout' => 0.5,
],
'packet' => bean('rpcClientPacket')
],
'bulletScreen.pool' => [
'class' => ServicePool::class,
'client' => bean('bulletScreen'),
],
增加了一个弹幕服务测试rpc接口
/**
* Class UserService
*
* @since 2.0
* @Service()
*/
class BulletScreenService implements BulletScreenInterface
{
/**
* @return string
*/
public function getIp(): string
{
return gethostname();
}
}
增加rpc访问路由
/**
* Class RpcController
*
* @since 2.0
*
* @Controller()
*/
class RpcController
{
/**
* @Reference(pool="bulletScreen.pool")
*
* @var BulletScreenInterface
*/
private $bulletScreenService;
/**
* @RequestMapping("bulletScreenTest")
*
* @throws Exception
*/
public function bulletScreenTest()
{
var_dump($this->bulletScreenService->getIp());
var_dump($this->bulletScreenService->getIp());
var_dump($this->bulletScreenService->getIp());
var_dump($this->bulletScreenService->getIp());
var_dump($this->bulletScreenService->getIp());
var_dump($this->bulletScreenService->getIp());
return $this->bulletScreenService->getIp();
}
}
访问结果
string(12) "b9326b6e0877"
string(12) "b9326b6e0877"
string(12) "b9326b6e0877"
string(12) "b9326b6e0877"
string(12) "731e95899f6a"
string(12) "b9326b6e0877"
string(12) "b9326b6e0877"
string(12) "b9326b6e0877"
string(12) "731e95899f6a"
string(12) "b9326b6e0877"
string(12) "b9326b6e0877"
string(12) "b9326b6e0877"
出现了两个不同的hostname,也就是说配置是生效的。当然也可以在连接源码中的getHostPort打印验证,在这里就不动源码了。
当然,实际中不可能靠人为手动维护服务集群的ip和端口,后面会尝试在这个集群中引入Consul来做服务注册,服务发现,服务治理。不过前提是我的小水管性能突破型测试机撑得住,这次开3个docker容器开启了3个swoft服务我的测试机cpu都快100%了XD
本文为龚学鹏原创文章,转载无需和我联系,但请注明来自龚学鹏博客http://www.noobcoder.cn
最新评论