ceIndexesManager进行处理,ClientServiceIndexesManager类维护clientId与service的注册关系和订阅关系。
private void addSubscriberIndexes(Service service, String clientId) {
subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
// Only first time add need notify event.
if (subscriberIndexes.get(service).add(clientId)) {
// 推送一个ServiceSubscribedEvent事件
NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
}
}
ServiceSubscribedEvent事件:Service is subscribed by one client event. NamingSubscriberServiceV2Impl进行处理。
public void onEvent(Event event) {
if (event instanceof ServiceEvent.ServiceChangedEvent) {
// If service changed, push to all subscribers.
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
delayTaskEngine.addTask(
service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
// 触发一次订阅者回调,把被订阅的服务的信息推送给订阅者
ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
Service service = subscribedEvent.getService();
delayTaskEngine.addTask(
service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
subscribedEvent.getClientId()));
}
}
取消服务订阅
public void unsubscribeService(Service service, Subscriber subscriber, String clientId) {
Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
Client client = clientManager.getClient(clientId);
client.removeServiceSubscriber(singleton);
client.setLastUpdatedTime();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientUnsubscribeServiceEvent(singleton, clientId));
}
推送一个ClientUnsubscribeServiceEvent事件,还是使用ClientServiceIndexesManager来处理,移除订阅关系。