001/* 002 * VM-Operator 003 * Copyright (C) 2023,2025 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.manager; 020 021import com.google.gson.JsonObject; 022import io.kubernetes.client.apimachinery.GroupVersionKind; 023import io.kubernetes.client.custom.V1Patch; 024import io.kubernetes.client.openapi.ApiException; 025import io.kubernetes.client.util.Watch; 026import io.kubernetes.client.util.generic.options.ListOptions; 027import java.io.IOException; 028import java.net.HttpURLConnection; 029import java.time.Instant; 030import java.util.ArrayList; 031import java.util.Collections; 032import java.util.Optional; 033import java.util.Set; 034import java.util.stream.Collectors; 035import org.jdrupes.vmoperator.common.Constants.Crd; 036import org.jdrupes.vmoperator.common.Constants.Status; 037import org.jdrupes.vmoperator.common.K8s; 038import org.jdrupes.vmoperator.common.K8sClient; 039import org.jdrupes.vmoperator.common.K8sDynamicStub; 040import org.jdrupes.vmoperator.common.K8sObserver.ResponseType; 041import org.jdrupes.vmoperator.common.K8sV1ConfigMapStub; 042import org.jdrupes.vmoperator.common.K8sV1StatefulSetStub; 043import org.jdrupes.vmoperator.common.VmDefinition; 044import org.jdrupes.vmoperator.common.VmDefinitionStub; 045import org.jdrupes.vmoperator.common.VmDefinitions; 046import org.jdrupes.vmoperator.common.VmExtraData; 047import static org.jdrupes.vmoperator.manager.Constants.APP_NAME; 048import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME; 049import org.jdrupes.vmoperator.manager.events.ChannelManager; 050import org.jdrupes.vmoperator.manager.events.ModifyVm; 051import org.jdrupes.vmoperator.manager.events.PodChanged; 052import org.jdrupes.vmoperator.manager.events.UpdateAssignment; 053import org.jdrupes.vmoperator.manager.events.VmChannel; 054import org.jdrupes.vmoperator.manager.events.VmResourceChanged; 055import org.jdrupes.vmoperator.util.GsonPtr; 056import org.jgrapes.core.Channel; 057import org.jgrapes.core.Event; 058import org.jgrapes.core.EventPipeline; 059import org.jgrapes.core.annotation.Handler; 060 061/** 062 * Watches for changes of VM definitions. When a VM definition (CR) 063 * becomes known, is is registered with a {@link ChannelManager} and thus 064 * gets an associated {@link VmChannel} and an associated 065 * {@link EventPipeline}. 066 * 067 * The {@link EventPipeline} is used for submitting an action that processes 068 * the change data from kubernetes, eventually transforming it to a 069 * {@link VmResourceChanged} event that is handled by another 070 * {@link EventPipeline} associated with the {@link VmChannel}. This 071 * event pipeline should be used for all events related to changes of 072 * a particular VM. 073 */ 074@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" }) 075public class VmMonitor extends 076 AbstractMonitor<VmDefinition, VmDefinitions, VmChannel> { 077 078 private final ChannelManager<String, VmChannel, 079 EventPipeline> channelManager; 080 081 /** 082 * Instantiates a new VM definition watcher. 083 * 084 * @param componentChannel the component channel 085 * @param channelManager the channel manager 086 */ 087 public VmMonitor(Channel componentChannel, 088 ChannelManager<String, VmChannel, EventPipeline> channelManager) { 089 super(componentChannel, VmDefinition.class, 090 VmDefinitions.class); 091 this.channelManager = channelManager; 092 } 093 094 @Override 095 protected void prepareMonitoring() throws IOException, ApiException { 096 client(new K8sClient()); 097 098 // Get all our API versions 099 var ctx = K8s.context(client(), Crd.GROUP, "", Crd.KIND_VM); 100 if (ctx.isEmpty()) { 101 logger.severe(() -> "Cannot get CRD context."); 102 return; 103 } 104 context(ctx.get()); 105 106 // Remove left over resources 107 purge(); 108 } 109 110 @SuppressWarnings("PMD.CognitiveComplexity") 111 private void purge() throws ApiException { 112 // Get existing CRs (VMs) 113 var known = K8sDynamicStub.list(client(), context(), namespace()) 114 .stream().map(stub -> stub.name()).collect(Collectors.toSet()); 115 ListOptions opts = new ListOptions(); 116 opts.setLabelSelector( 117 "app.kubernetes.io/managed-by=" + VM_OP_NAME + "," 118 + "app.kubernetes.io/name=" + APP_NAME); 119 for (var context : Set.of(K8sV1StatefulSetStub.CONTEXT, 120 K8sV1ConfigMapStub.CONTEXT)) { 121 for (var resStub : K8sDynamicStub.list(client(), context, 122 namespace(), opts)) { 123 String instance = resStub.model() 124 .map(m -> m.metadata().getName()).orElse("(unknown)"); 125 if (!known.contains(instance)) { 126 resStub.delete(); 127 } 128 } 129 } 130 } 131 132 @Override 133 protected void handleChange(K8sClient client, 134 Watch.Response<VmDefinition> response) { 135 var name = response.object.getMetadata().getName(); 136 137 // Process the response data on a VM specific pipeline to 138 // increase concurrency when e.g. starting many VMs. 139 var preparing = channelManager.associated(name) 140 .orElseGet(() -> newEventPipeline()); 141 preparing.submit("VmChange[" + name + "]", 142 () -> processChange(client, response, preparing)); 143 } 144 145 private void processChange(K8sClient client, 146 Watch.Response<VmDefinition> response, EventPipeline preparing) { 147 // Get full definition and associate with channel as backup 148 var vmDef = response.object; 149 if (vmDef.data() == null) { 150 // ADDED event does not provide data, see 151 // https://github.com/kubernetes-client/java/issues/3215 152 vmDef = getModel(client, vmDef); 153 } 154 var name = response.object.getMetadata().getName(); 155 var channel = channelManager.channel(name) 156 .orElseGet(() -> channelManager.createChannel(name)); 157 if (vmDef.data() != null) { 158 // New data, augment and save 159 addExtraData(vmDef, channel.vmDefinition()); 160 channel.setVmDefinition(vmDef); 161 } else { 162 // Reuse cached (e.g. if deleted) 163 vmDef = channel.vmDefinition(); 164 } 165 if (vmDef == null) { 166 logger.warning(() -> "Cannot get defintion for " 167 + response.object.getMetadata()); 168 return; 169 } 170 channelManager.put(name, channel, preparing); 171 172 // Create and fire changed event. Remove channel from channel 173 // manager on completion. 174 VmResourceChanged chgEvt 175 = new VmResourceChanged(ResponseType.valueOf(response.type), vmDef, 176 channel.setGeneration(response.object.getMetadata() 177 .getGeneration()), 178 false); 179 if (ResponseType.valueOf(response.type) == ResponseType.DELETED) { 180 chgEvt = Event.onCompletion(chgEvt, 181 e -> channelManager.remove(e.vmDefinition().name())); 182 } 183 channel.fire(chgEvt); 184 } 185 186 private VmDefinition getModel(K8sClient client, VmDefinition vmDef) { 187 try { 188 return VmDefinitionStub.get(client, context(), namespace(), 189 vmDef.metadata().getName()).model().orElse(null); 190 } catch (ApiException e) { 191 return null; 192 } 193 } 194 195 @SuppressWarnings("PMD.AvoidDuplicateLiterals") 196 private void addExtraData(VmDefinition vmDef, VmDefinition prevState) { 197 var extra = new VmExtraData(vmDef); 198 var prevExtra = Optional.ofNullable(prevState).map(VmDefinition::extra); 199 200 // Maintain (or initialize) the resetCount 201 extra.resetCount(prevExtra.map(VmExtraData::resetCount).orElse(0L)); 202 203 // Maintain node info 204 prevExtra 205 .ifPresent(e -> extra.nodeInfo(e.nodeName(), e.nodeAddresses())); 206 } 207 208 /** 209 * On pod changed. 210 * 211 * @param event the event 212 * @param channel the channel 213 */ 214 @Handler 215 public void onPodChanged(PodChanged event, VmChannel channel) { 216 var vmDef = channel.vmDefinition(); 217 218 // Make sure that this is properly sync'd with VM CR changes. 219 channelManager.associated(vmDef.name()) 220 .orElseGet(() -> activeEventPipeline()) 221 .submit("NodeInfo[" + vmDef.name() + "]", 222 () -> { 223 updateNodeInfo(event, vmDef); 224 channel.fire(new VmResourceChanged(ResponseType.MODIFIED, 225 vmDef, false, true)); 226 }); 227 } 228 229 private void updateNodeInfo(PodChanged event, VmDefinition vmDef) { 230 var extra = vmDef.extra(); 231 if (event.type() == ResponseType.DELETED) { 232 // The status of a deleted pod is the status before deletion, 233 // i.e. the node info is still cached and must be removed. 234 extra.nodeInfo("", Collections.emptyList()); 235 return; 236 } 237 238 // Get current node info from pod 239 var pod = event.pod(); 240 var nodeName = Optional 241 .ofNullable(pod.getSpec().getNodeName()).orElse(""); 242 logger.finer(() -> "Adding node name " + nodeName 243 + " to VM info for " + vmDef.name()); 244 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 245 var addrs = new ArrayList<String>(); 246 Optional.ofNullable(pod.getStatus().getPodIPs()) 247 .orElse(Collections.emptyList()).stream() 248 .map(ip -> ip.getIp()).forEach(addrs::add); 249 logger.finer(() -> "Adding node addresses " + addrs 250 + " to VM info for " + vmDef.name()); 251 extra.nodeInfo(nodeName, addrs); 252 } 253 254 /** 255 * On modify vm. 256 * 257 * @param event the event 258 * @throws ApiException the api exception 259 * @throws IOException Signals that an I/O exception has occurred. 260 */ 261 @Handler 262 public void onModifyVm(ModifyVm event, VmChannel channel) 263 throws ApiException, IOException { 264 patchVmDef(channel.client(), event.name(), "spec/vm/" + event.path(), 265 event.value()); 266 } 267 268 private void patchVmDef(K8sClient client, String name, String path, 269 Object value) throws ApiException, IOException { 270 var vmStub = K8sDynamicStub.get(client, 271 new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM), namespace(), 272 name); 273 274 // Patch running 275 String valueAsText = value instanceof String 276 ? "\"" + value + "\"" 277 : value.toString(); 278 var res = vmStub.patch(V1Patch.PATCH_FORMAT_JSON_PATCH, 279 new V1Patch("[{\"op\": \"replace\", \"path\": \"/" 280 + path + "\", \"value\": " + valueAsText + "}]"), 281 client.defaultPatchOptions()); 282 if (!res.isPresent()) { 283 logger.warning( 284 () -> "Cannot patch definition for Vm " + vmStub.name()); 285 } 286 } 287 288 /** 289 * Attempt to Update the assignment information in the status of the 290 * VM CR. Returns true if successful. The handler does not attempt 291 * retries, because in case of failure it will be necessary to 292 * re-evaluate the chosen VM. 293 * 294 * @param event the event 295 * @param channel the channel 296 * @throws ApiException the api exception 297 */ 298 @Handler 299 public void onUpdatedAssignment(UpdateAssignment event, VmChannel channel) 300 throws ApiException { 301 try { 302 var vmDef = channel.vmDefinition(); 303 var vmStub = VmDefinitionStub.get(channel.client(), 304 new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM), 305 vmDef.namespace(), vmDef.name()); 306 if (vmStub.updateStatus(vmDef, from -> { 307 JsonObject status = from.statusJson(); 308 if (event.toUser() == null) { 309 ((JsonObject) GsonPtr.to(status).get()) 310 .remove(Status.ASSIGNMENT); 311 } else { 312 var assignment = GsonPtr.to(status).to(Status.ASSIGNMENT); 313 assignment.set("pool", event.fromPool().name()); 314 assignment.set("user", event.toUser()); 315 assignment.set("lastUsed", Instant.now().toString()); 316 } 317 return status; 318 }).isPresent()) { 319 event.setResult(true); 320 } 321 } catch (ApiException e) { 322 // Log exceptions except for conflict, which can be expected 323 if (HttpURLConnection.HTTP_CONFLICT != e.getCode()) { 324 throw e; 325 } 326 } 327 event.setResult(false); 328 } 329 330}