001/* 002 * VM-Operator 003 * Copyright (C) 2023,2025 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.manager; 020 021import com.google.gson.JsonObject; 022import io.kubernetes.client.apimachinery.GroupVersionKind; 023import io.kubernetes.client.custom.V1Patch; 024import io.kubernetes.client.openapi.ApiException; 025import io.kubernetes.client.openapi.models.V1ObjectMeta; 026import io.kubernetes.client.util.Watch; 027import io.kubernetes.client.util.generic.options.ListOptions; 028import java.io.IOException; 029import java.net.HttpURLConnection; 030import java.time.Instant; 031import java.util.ArrayList; 032import java.util.Collections; 033import java.util.Optional; 034import java.util.Set; 035import java.util.concurrent.atomic.AtomicBoolean; 036import java.util.stream.Collectors; 037import org.jdrupes.vmoperator.common.Constants.Crd; 038import org.jdrupes.vmoperator.common.Constants.Status; 039import org.jdrupes.vmoperator.common.K8s; 040import org.jdrupes.vmoperator.common.K8sClient; 041import org.jdrupes.vmoperator.common.K8sDynamicStub; 042import org.jdrupes.vmoperator.common.K8sObserver.ResponseType; 043import org.jdrupes.vmoperator.common.K8sV1ConfigMapStub; 044import org.jdrupes.vmoperator.common.K8sV1StatefulSetStub; 045import org.jdrupes.vmoperator.common.VmDefinition; 046import org.jdrupes.vmoperator.common.VmDefinitionStub; 047import org.jdrupes.vmoperator.common.VmDefinitions; 048import org.jdrupes.vmoperator.common.VmExtraData; 049import static org.jdrupes.vmoperator.manager.Constants.APP_NAME; 050import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME; 051import org.jdrupes.vmoperator.manager.events.ChannelManager; 052import org.jdrupes.vmoperator.manager.events.ModifyVm; 053import org.jdrupes.vmoperator.manager.events.PodChanged; 054import org.jdrupes.vmoperator.manager.events.UpdateAssignment; 055import org.jdrupes.vmoperator.manager.events.VmChannel; 056import org.jdrupes.vmoperator.manager.events.VmResourceChanged; 057import org.jdrupes.vmoperator.util.GsonPtr; 058import org.jgrapes.core.Channel; 059import org.jgrapes.core.Event; 060import org.jgrapes.core.annotation.Handler; 061 062/** 063 * Watches for changes of VM definitions. 064 */ 065@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" }) 066public class VmMonitor extends 067 AbstractMonitor<VmDefinition, VmDefinitions, VmChannel> { 068 069 private final ChannelManager<String, VmChannel, ?> channelManager; 070 071 /** 072 * Instantiates a new VM definition watcher. 073 * 074 * @param componentChannel the component channel 075 * @param channelManager the channel manager 076 */ 077 public VmMonitor(Channel componentChannel, 078 ChannelManager<String, VmChannel, ?> channelManager) { 079 super(componentChannel, VmDefinition.class, 080 VmDefinitions.class); 081 this.channelManager = channelManager; 082 } 083 084 @Override 085 protected void prepareMonitoring() throws IOException, ApiException { 086 client(new K8sClient()); 087 088 // Get all our API versions 089 var ctx = K8s.context(client(), Crd.GROUP, "", Crd.KIND_VM); 090 if (ctx.isEmpty()) { 091 logger.severe(() -> "Cannot get CRD context."); 092 return; 093 } 094 context(ctx.get()); 095 096 // Remove left over resources 097 purge(); 098 } 099 100 @SuppressWarnings("PMD.CognitiveComplexity") 101 private void purge() throws ApiException { 102 // Get existing CRs (VMs) 103 var known = K8sDynamicStub.list(client(), context(), namespace()) 104 .stream().map(stub -> stub.name()).collect(Collectors.toSet()); 105 ListOptions opts = new ListOptions(); 106 opts.setLabelSelector( 107 "app.kubernetes.io/managed-by=" + VM_OP_NAME + "," 108 + "app.kubernetes.io/name=" + APP_NAME); 109 for (var context : Set.of(K8sV1StatefulSetStub.CONTEXT, 110 K8sV1ConfigMapStub.CONTEXT)) { 111 for (var resStub : K8sDynamicStub.list(client(), context, 112 namespace(), opts)) { 113 String instance = resStub.model() 114 .map(m -> m.metadata().getName()).orElse("(unknown)"); 115 if (!known.contains(instance)) { 116 resStub.delete(); 117 } 118 } 119 } 120 } 121 122 @Override 123 protected void handleChange(K8sClient client, 124 Watch.Response<VmDefinition> response) { 125 V1ObjectMeta metadata = response.object.getMetadata(); 126 AtomicBoolean toBeAdded = new AtomicBoolean(false); 127 VmChannel channel = channelManager.channel(metadata.getName()) 128 .orElseGet(() -> { 129 toBeAdded.set(true); 130 return channelManager.createChannel(metadata.getName()); 131 }); 132 133 // Get full definition and associate with channel as backup 134 var vmDef = response.object; 135 if (vmDef.data() == null) { 136 // ADDED event does not provide data, see 137 // https://github.com/kubernetes-client/java/issues/3215 138 vmDef = getModel(client, vmDef); 139 } 140 if (vmDef.data() != null) { 141 // New data, augment and save 142 addExtraData(vmDef, channel.vmDefinition()); 143 channel.setVmDefinition(vmDef); 144 } else { 145 // Reuse cached (e.g. if deleted) 146 vmDef = channel.vmDefinition(); 147 } 148 if (vmDef == null) { 149 logger.warning(() -> "Cannot get defintion for " 150 + response.object.getMetadata()); 151 return; 152 } 153 if (toBeAdded.get()) { 154 channelManager.put(vmDef.name(), channel); 155 } 156 157 // Create and fire changed event. Remove channel from channel 158 // manager on completion. 159 VmResourceChanged chgEvt 160 = new VmResourceChanged(ResponseType.valueOf(response.type), vmDef, 161 channel.setGeneration(response.object.getMetadata() 162 .getGeneration()), 163 false); 164 if (ResponseType.valueOf(response.type) == ResponseType.DELETED) { 165 chgEvt = Event.onCompletion(chgEvt, 166 e -> channelManager.remove(e.vmDefinition().name())); 167 } 168 channel.fire(chgEvt); 169 } 170 171 private VmDefinition getModel(K8sClient client, VmDefinition vmDef) { 172 try { 173 return VmDefinitionStub.get(client, context(), namespace(), 174 vmDef.metadata().getName()).model().orElse(null); 175 } catch (ApiException e) { 176 return null; 177 } 178 } 179 180 @SuppressWarnings("PMD.AvoidDuplicateLiterals") 181 private void addExtraData(VmDefinition vmDef, VmDefinition prevState) { 182 var extra = new VmExtraData(vmDef); 183 var prevExtra = Optional.ofNullable(prevState).map(VmDefinition::extra); 184 185 // Maintain (or initialize) the resetCount 186 extra.resetCount(prevExtra.map(VmExtraData::resetCount).orElse(0L)); 187 188 // Maintain node info 189 prevExtra 190 .ifPresent(e -> extra.nodeInfo(e.nodeName(), e.nodeAddresses())); 191 } 192 193 /** 194 * On pod changed. 195 * 196 * @param event the event 197 * @param channel the channel 198 */ 199 @Handler 200 public void onPodChanged(PodChanged event, VmChannel channel) { 201 var vmDef = channel.vmDefinition(); 202 updateNodeInfo(event, vmDef); 203 channel 204 .fire(new VmResourceChanged(ResponseType.MODIFIED, vmDef, false, true)); 205 } 206 207 private void updateNodeInfo(PodChanged event, VmDefinition vmDef) { 208 var extra = vmDef.extra(); 209 if (event.type() == ResponseType.DELETED) { 210 // The status of a deleted pod is the status before deletion, 211 // i.e. the node info is still cached and must be removed. 212 extra.nodeInfo("", Collections.emptyList()); 213 return; 214 } 215 216 // Get current node info from pod 217 var pod = event.pod(); 218 var nodeName = Optional 219 .ofNullable(pod.getSpec().getNodeName()).orElse(""); 220 logger.finer(() -> "Adding node name " + nodeName 221 + " to VM info for " + vmDef.name()); 222 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 223 var addrs = new ArrayList<String>(); 224 Optional.ofNullable(pod.getStatus().getPodIPs()) 225 .orElse(Collections.emptyList()).stream() 226 .map(ip -> ip.getIp()).forEach(addrs::add); 227 logger.finer(() -> "Adding node addresses " + addrs 228 + " to VM info for " + vmDef.name()); 229 extra.nodeInfo(nodeName, addrs); 230 } 231 232 /** 233 * On modify vm. 234 * 235 * @param event the event 236 * @throws ApiException the api exception 237 * @throws IOException Signals that an I/O exception has occurred. 238 */ 239 @Handler 240 public void onModifyVm(ModifyVm event, VmChannel channel) 241 throws ApiException, IOException { 242 patchVmDef(channel.client(), event.name(), "spec/vm/" + event.path(), 243 event.value()); 244 } 245 246 private void patchVmDef(K8sClient client, String name, String path, 247 Object value) throws ApiException, IOException { 248 var vmStub = K8sDynamicStub.get(client, 249 new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM), namespace(), 250 name); 251 252 // Patch running 253 String valueAsText = value instanceof String 254 ? "\"" + value + "\"" 255 : value.toString(); 256 var res = vmStub.patch(V1Patch.PATCH_FORMAT_JSON_PATCH, 257 new V1Patch("[{\"op\": \"replace\", \"path\": \"/" 258 + path + "\", \"value\": " + valueAsText + "}]"), 259 client.defaultPatchOptions()); 260 if (!res.isPresent()) { 261 logger.warning( 262 () -> "Cannot patch definition for Vm " + vmStub.name()); 263 } 264 } 265 266 /** 267 * Attempt to Update the assignment information in the status of the 268 * VM CR. Returns true if successful. The handler does not attempt 269 * retries, because in case of failure it will be necessary to 270 * re-evaluate the chosen VM. 271 * 272 * @param event the event 273 * @param channel the channel 274 * @throws ApiException the api exception 275 */ 276 @Handler 277 public void onUpdatedAssignment(UpdateAssignment event, VmChannel channel) 278 throws ApiException { 279 try { 280 var vmDef = channel.vmDefinition(); 281 var vmStub = VmDefinitionStub.get(channel.client(), 282 new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM), 283 vmDef.namespace(), vmDef.name()); 284 if (vmStub.updateStatus(vmDef, from -> { 285 JsonObject status = from.statusJson(); 286 if (event.toUser() == null) { 287 ((JsonObject) GsonPtr.to(status).get()) 288 .remove(Status.ASSIGNMENT); 289 } else { 290 var assignment = GsonPtr.to(status).to(Status.ASSIGNMENT); 291 assignment.set("pool", event.fromPool().name()); 292 assignment.set("user", event.toUser()); 293 assignment.set("lastUsed", Instant.now().toString()); 294 } 295 return status; 296 }).isPresent()) { 297 event.setResult(true); 298 } 299 } catch (ApiException e) { 300 // Log exceptions except for conflict, which can be expected 301 if (HttpURLConnection.HTTP_CONFLICT != e.getCode()) { 302 throw e; 303 } 304 } 305 event.setResult(false); 306 } 307 308}