rService(serviceName, groupName, instance);
}
查询实例
示例代码:
NacosNamingService namingService = new NacosNamingService(NACOS_HOST);
List<Instance> instances = namingService.getAllInstances(ORDER_SERVICE, true);
System.out.printf(">> instance count=%d\n", instances.size());
for (Instance instance : instances) {
System.out.printf(">> serviceName=%s, id=%s, cluster=%s, ip=%s, port=%s\n",
instance.getServiceName(), instance.getInstanceId(),
instance.getClusterName(), instance.getIp(), instance.getPort());
}
提供了几个重载的getAllInstances方法,最重要的参数就是subscribe,当为true时,会向服务端发送订阅请求,之后一直从ServiceInfoHolder中获取服务实例信息,而不再向服务端发送查询请求。
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
String clusterString = StringUtils.join(clusters, ",");
if (subscribe) {
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo) {
// 订阅请求
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
// 查询请求
serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
服务订阅
示例代码:
NacosNamingService namingService = new NacosNamingService(NACOS_HOST);
namingService.subscribe(ORDER_SERVICE, new EventListener() {
@Override
public void onEvent(Event event) {
NamingEvent e = (NamingEvent) event;
System.out.println("serviceName=" + e.getServiceName());
List<Instance> instances = e.getInstances();
System.out.printf(">> instance count=%d\n", instances.size());
for (Instance instance : instances) {
System.out.printf(">> serviceName=%s, id=%s, cluster=%s, ip=%s, port=%s\n",
instance.getServiceName(), instance.getInstanceId(),
instance.getClusterName(), instance.getIp(), instance.getPort());
}
}
});
TimeUnit.SECONDS.sleep(1200);
subscribe方法:
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
String clusterString = StringUtils.join(clusters, ",");
// 将listener保存到listenerMap中
changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
// 发送订阅请求
clientProxy.subscribe(serviceName, groupName, clusterString);
}
实例变化的方法调用栈:
当收到服务端的实例变化事件时,会触发grpc层的观察者监听:
public void onMessage(RespT message) {
if (firstResponseReceived && !streamingResponse) {
throw Status.INTERNAL
.withDescription("More than one responses received for unary or client-streaming call")
.asRuntimeException();
}
firstResponseReceived = true;
// 调用观察者
observer.onNext(message);
if (streamingResponse && adapter.autoFlowControlEnabled) {
// Request delivery of the next inbound message.
adapter.request(1);
}
}
此处的observer是在创建rpc连接的时候注册的:
private StreamObserver<Payload> bindRequestStream(
final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
final GrpcConne