001/* 002 * VM-Operator 003 * Copyright (C) 2023,2025 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.manager; 020 021import io.kubernetes.client.openapi.ApiException; 022import io.kubernetes.client.openapi.models.V1ObjectMeta; 023import io.kubernetes.client.util.Watch; 024import io.kubernetes.client.util.generic.options.ListOptions; 025import java.io.IOException; 026import java.time.Instant; 027import java.util.ArrayList; 028import java.util.Comparator; 029import java.util.Optional; 030import java.util.Set; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.logging.Level; 033import java.util.stream.Collectors; 034import org.jdrupes.vmoperator.common.Constants.Crd; 035import org.jdrupes.vmoperator.common.K8s; 036import org.jdrupes.vmoperator.common.K8sClient; 037import org.jdrupes.vmoperator.common.K8sDynamicStub; 038import org.jdrupes.vmoperator.common.K8sObserver.ResponseType; 039import org.jdrupes.vmoperator.common.K8sV1ConfigMapStub; 040import org.jdrupes.vmoperator.common.K8sV1PodStub; 041import org.jdrupes.vmoperator.common.K8sV1StatefulSetStub; 042import org.jdrupes.vmoperator.common.VmDefinition; 043import org.jdrupes.vmoperator.common.VmDefinitionStub; 044import org.jdrupes.vmoperator.common.VmDefinitions; 045import org.jdrupes.vmoperator.common.VmExtraData; 046import org.jdrupes.vmoperator.common.VmPool; 047import static org.jdrupes.vmoperator.manager.Constants.APP_NAME; 048import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME; 049import org.jdrupes.vmoperator.manager.events.AssignVm; 050import org.jdrupes.vmoperator.manager.events.ChannelManager; 051import org.jdrupes.vmoperator.manager.events.GetPools; 052import org.jdrupes.vmoperator.manager.events.GetVms; 053import org.jdrupes.vmoperator.manager.events.GetVms.VmData; 054import org.jdrupes.vmoperator.manager.events.ModifyVm; 055import org.jdrupes.vmoperator.manager.events.UpdateAssignment; 056import org.jdrupes.vmoperator.manager.events.VmChannel; 057import org.jdrupes.vmoperator.manager.events.VmDefChanged; 058import org.jgrapes.core.Channel; 059import org.jgrapes.core.Event; 060import org.jgrapes.core.annotation.Handler; 061 062/** 063 * Watches for changes of VM definitions. 064 */ 065@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" }) 066public class VmMonitor extends 067 AbstractMonitor<VmDefinition, VmDefinitions, VmChannel> { 068 069 private final ChannelManager<String, VmChannel, ?> channelManager; 070 071 /** 072 * Instantiates a new VM definition watcher. 073 * 074 * @param componentChannel the component channel 075 * @param channelManager the channel manager 076 */ 077 public VmMonitor(Channel componentChannel, 078 ChannelManager<String, VmChannel, ?> channelManager) { 079 super(componentChannel, VmDefinition.class, 080 VmDefinitions.class); 081 this.channelManager = channelManager; 082 } 083 084 @Override 085 protected void prepareMonitoring() throws IOException, ApiException { 086 client(new K8sClient()); 087 088 // Get all our API versions 089 var ctx = K8s.context(client(), Crd.GROUP, "", Crd.KIND_VM); 090 if (ctx.isEmpty()) { 091 logger.severe(() -> "Cannot get CRD context."); 092 return; 093 } 094 context(ctx.get()); 095 096 // Remove left over resources 097 purge(); 098 } 099 100 @SuppressWarnings("PMD.CognitiveComplexity") 101 private void purge() throws ApiException { 102 // Get existing CRs (VMs) 103 var known = K8sDynamicStub.list(client(), context(), namespace()) 104 .stream().map(stub -> stub.name()).collect(Collectors.toSet()); 105 ListOptions opts = new ListOptions(); 106 opts.setLabelSelector( 107 "app.kubernetes.io/managed-by=" + VM_OP_NAME + "," 108 + "app.kubernetes.io/name=" + APP_NAME); 109 for (var context : Set.of(K8sV1StatefulSetStub.CONTEXT, 110 K8sV1ConfigMapStub.CONTEXT)) { 111 for (var resStub : K8sDynamicStub.list(client(), context, 112 namespace(), opts)) { 113 String instance = resStub.model() 114 .map(m -> m.metadata().getName()).orElse("(unknown)"); 115 if (!known.contains(instance)) { 116 resStub.delete(); 117 } 118 } 119 } 120 } 121 122 @Override 123 protected void handleChange(K8sClient client, 124 Watch.Response<VmDefinition> response) { 125 V1ObjectMeta metadata = response.object.getMetadata(); 126 AtomicBoolean toBeAdded = new AtomicBoolean(false); 127 VmChannel channel = channelManager.channel(metadata.getName()) 128 .orElseGet(() -> { 129 toBeAdded.set(true); 130 return channelManager.createChannel(metadata.getName()); 131 }); 132 133 // Get full definition and associate with channel as backup 134 var vmDef = response.object; 135 if (vmDef.data() == null) { 136 // ADDED event does not provide data, see 137 // https://github.com/kubernetes-client/java/issues/3215 138 vmDef = getModel(client, vmDef); 139 } 140 if (vmDef.data() != null) { 141 // New data, augment and save 142 addExtraData(channel.client(), vmDef, channel.vmDefinition()); 143 channel.setVmDefinition(vmDef); 144 } else { 145 // Reuse cached (e.g. if deleted) 146 vmDef = channel.vmDefinition(); 147 } 148 if (vmDef == null) { 149 logger.warning(() -> "Cannot get defintion for " 150 + response.object.getMetadata()); 151 return; 152 } 153 if (toBeAdded.get()) { 154 channelManager.put(vmDef.name(), channel); 155 } 156 157 // Create and fire changed event. Remove channel from channel 158 // manager on completion. 159 VmDefChanged chgEvt 160 = new VmDefChanged(ResponseType.valueOf(response.type), 161 channel.setGeneration(response.object.getMetadata() 162 .getGeneration()), 163 vmDef); 164 if (ResponseType.valueOf(response.type) == ResponseType.DELETED) { 165 chgEvt = Event.onCompletion(chgEvt, 166 e -> channelManager.remove(e.vmDefinition().name())); 167 } 168 channel.pipeline().fire(chgEvt, channel); 169 } 170 171 private VmDefinition getModel(K8sClient client, VmDefinition vmDef) { 172 try { 173 return VmDefinitionStub.get(client, context(), namespace(), 174 vmDef.metadata().getName()).model().orElse(null); 175 } catch (ApiException e) { 176 return null; 177 } 178 } 179 180 @SuppressWarnings("PMD.AvoidDuplicateLiterals") 181 private void addExtraData(K8sClient client, VmDefinition vmDef, 182 VmDefinition prevState) { 183 var extra = new VmExtraData(vmDef); 184 185 // Maintain (or initialize) the resetCount 186 extra.resetCount( 187 Optional.ofNullable(prevState).flatMap(VmDefinition::extra) 188 .map(VmExtraData::resetCount).orElse(0L)); 189 190 // VM definition status changes before the pod terminates. 191 // This results in pod information being shown for a stopped 192 // VM which is irritating. So check condition first. 193 if (!vmDef.conditionStatus("Running").orElse(false)) { 194 return; 195 } 196 197 // Get pod and extract node information. 198 var podSearch = new ListOptions(); 199 podSearch.setLabelSelector("app.kubernetes.io/name=" + APP_NAME 200 + ",app.kubernetes.io/component=" + APP_NAME 201 + ",app.kubernetes.io/instance=" + vmDef.name()); 202 try { 203 var podList 204 = K8sV1PodStub.list(client, namespace(), podSearch); 205 for (var podStub : podList) { 206 var nodeName = podStub.model().get().getSpec().getNodeName(); 207 logger.fine(() -> "Adding node name " + nodeName 208 + " to VM info for " + vmDef.name()); 209 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 210 var addrs = new ArrayList<String>(); 211 podStub.model().get().getStatus().getPodIPs().stream() 212 .map(ip -> ip.getIp()).forEach(addrs::add); 213 logger.fine(() -> "Adding node addresses " + addrs 214 + " to VM info for " + vmDef.name()); 215 extra.nodeInfo(nodeName, addrs); 216 } 217 } catch (ApiException e) { 218 logger.log(Level.WARNING, e, 219 () -> "Cannot access node information: " + e.getMessage()); 220 } 221 } 222 223 /** 224 * Returns the VM data. 225 * 226 * @param event the event 227 */ 228 @Handler 229 public void onGetVms(GetVms event) { 230 event.setResult(channelManager.channels().stream() 231 .filter(c -> event.name().isEmpty() 232 || c.vmDefinition().name().equals(event.name().get())) 233 .filter(c -> event.user().isEmpty() && event.roles().isEmpty() 234 || !c.vmDefinition().permissionsFor(event.user().orElse(null), 235 event.roles()).isEmpty()) 236 .filter(c -> event.fromPool().isEmpty() 237 || c.vmDefinition().assignedFrom() 238 .map(p -> p.equals(event.fromPool().get())).orElse(false)) 239 .filter(c -> event.toUser().isEmpty() 240 || c.vmDefinition().assignedTo() 241 .map(u -> u.equals(event.toUser().get())).orElse(false)) 242 .map(c -> new VmData(c.vmDefinition(), c)) 243 .toList()); 244 } 245 246 /** 247 * Assign a VM if not already assigned. 248 * 249 * @param event the event 250 * @throws ApiException the api exception 251 * @throws InterruptedException 252 */ 253 @Handler 254 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 255 public void onAssignVm(AssignVm event) 256 throws ApiException, InterruptedException { 257 while (true) { 258 // Search for existing assignment. 259 var vmQuery = channelManager.channels().stream() 260 .filter(c -> c.vmDefinition().assignedFrom() 261 .map(p -> p.equals(event.fromPool())).orElse(false)) 262 .filter(c -> c.vmDefinition().assignedTo() 263 .map(u -> u.equals(event.toUser())).orElse(false)) 264 .findFirst(); 265 if (vmQuery.isPresent()) { 266 var vmDef = vmQuery.get().vmDefinition(); 267 event.setResult(new VmData(vmDef, vmQuery.get())); 268 return; 269 } 270 271 // Get the pool definition for checking possible assignment 272 VmPool vmPool = newEventPipeline().fire(new GetPools() 273 .withName(event.fromPool())).get().stream().findFirst() 274 .orElse(null); 275 if (vmPool == null) { 276 return; 277 } 278 279 // Find available VM. 280 vmQuery = channelManager.channels().stream() 281 .filter(c -> vmPool.isAssignable(c.vmDefinition())) 282 .sorted(Comparator.comparing((VmChannel c) -> c.vmDefinition() 283 .assignmentLastUsed().orElse(Instant.ofEpochSecond(0))) 284 .thenComparing(preferRunning)) 285 .findFirst(); 286 287 // None found 288 if (vmQuery.isEmpty()) { 289 return; 290 } 291 292 // Assign to user 293 var chosenVm = vmQuery.get(); 294 var vmPipeline = chosenVm.pipeline(); 295 if (Optional.ofNullable(vmPipeline.fire(new UpdateAssignment( 296 vmPool.name(), event.toUser()), chosenVm).get()) 297 .orElse(false)) { 298 var vmDef = chosenVm.vmDefinition(); 299 event.setResult(new VmData(vmDef, chosenVm)); 300 301 // Make sure that a newly assigned VM is running. 302 chosenVm.pipeline().fire(new ModifyVm(vmDef.name(), 303 "state", "Running", chosenVm)); 304 return; 305 } 306 } 307 } 308 309 private static Comparator<VmChannel> preferRunning 310 = new Comparator<>() { 311 @Override 312 public int compare(VmChannel ch1, VmChannel ch2) { 313 if (ch1.vmDefinition().conditionStatus("Running").orElse(false) 314 && !ch2.vmDefinition().conditionStatus("Running") 315 .orElse(false)) { 316 return -1; 317 } 318 return 0; 319 } 320 }; 321 322}