Strom:pluggable scheduler 自己写的自定义分配任务(二)

2014-11-24 07:55:13 · 作者: · 浏览: 2
uster,String SupervisorName) { // find out the our "special-supervisor" from the supervisor metadata Collection supervisors = cluster.getSupervisors().values(); SupervisorDetails specialSupervisor = null; for (SupervisorDetails supervisor : supervisors) { Map meta = (Map) supervisor.getSchedulerMeta(); if (meta.get("name").equals(SupervisorName)) { specialSupervisor = supervisor; break; } } return specialSupervisor; } public void schedule(Topologies topologies, Cluster cluster) { System.out.println("DemoScheduler: begin scheduling"); // Gets the topology which we want to schedule TopologyDetails topology = topologies.getByName("special-topology"); // make sure the special topology is submitted, if (topology != null) { System.out.println("special-topology is not null!!!"); if(flag==0) { boolean needsScheduling = cluster.needsScheduling(topology); // cluster.n if (needsScheduling) { System.out.println("Our special topology DOES NOT NEED scheduling."); } else { System.out.println("Our special topology needs scheduling."); // find out all the needs-scheduling components of this topology Collection Tempsupervisors = cluster.getSupervisors().values();//d for (SupervisorDetails supervisor : Tempsupervisors) { List availableSlots = cluster.getAvailableSlots(supervisor); // int Availablenum =availableSlots.size(); String suName=supervisor.getHost(); System.out.println("before:HostName:"+suName+" AvailableNum:"+Availablenum); if(!availableSlots.isEmpty()) { for (Integer port : cluster.getUsedPorts(supervisor)) { cluster.freeSlot(new WorkerSlot(supervisor.getId(), port)); } } List availableSlots2 = cluster.getAvailableSlots(supervisor); int Availablenum2 =availableSlots2.size(); System.out.println("after:HostName:"+suName+" AvailableNum:"+Availablenum2); } Map
> componentToExecutors = cluster.getNeedsSchedulingComponentToExecutors(topology); System.out.println("needs scheduling(component->executor): " + componentToExecutors); System.out.println("needs scheduling(executor->compoenents): " + cluster.getNeedsSchedulingExecutorToComponents(topology)); SchedulerAssignment currentAssignment = cluster.getAssignmentById(topologies.getByName("special-topology").getId()); if (currentAssignment != null) { System.out.println("current assignments: " + currentAssignment.getExecutorToSlot()); } else { System.out.println("current assignments: {}"); } if (!componentToExecutors.containsKey("spout")) { System.out.println("Our special-spout DOES NOT NEED scheduling."); } else { System.out.println("Our special-spout needs scheduling."); List executors = componentToExecutors.get("spout"); // find out the our "special-supervisor" from the supervisor metadata Collection supervisors = cluster.getSupervisors().values(); SupervisorDetails specialSupervisor = null; for (SupervisorDetails supervisor : supervisors) { Map meta = (Map) supervisor.getSchedulerMeta(); if (meta.get("name").equals("special-slave2")) { specialSupervisor = supervisor; break; } } // found the special supervisor if (specialSupervisor != null) { System.out.println("Found the special-supervisor"); List availableSlots = cluster.getAvailableSlots(specialSupervisor); // if there is no available slots on this supervisor, free some. // TODO for simplicity, we free all the used slots on the supervis