001/* 002 * VM-Operator 003 * Copyright (C) 2023,2025 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.manager; 020 021import com.google.gson.JsonObject; 022import io.kubernetes.client.apimachinery.GroupVersionKind; 023import io.kubernetes.client.custom.V1Patch; 024import io.kubernetes.client.openapi.ApiException; 025import io.kubernetes.client.util.Watch; 026import io.kubernetes.client.util.generic.options.ListOptions; 027import java.io.IOException; 028import java.net.HttpURLConnection; 029import java.time.Instant; 030import java.util.ArrayList; 031import java.util.Collections; 032import java.util.Optional; 033import java.util.Set; 034import java.util.stream.Collectors; 035import org.jdrupes.vmoperator.common.Constants.Crd; 036import org.jdrupes.vmoperator.common.Constants.Status; 037import org.jdrupes.vmoperator.common.K8s; 038import org.jdrupes.vmoperator.common.K8sClient; 039import org.jdrupes.vmoperator.common.K8sDynamicStub; 040import org.jdrupes.vmoperator.common.K8sObserver.ResponseType; 041import org.jdrupes.vmoperator.common.K8sV1ConfigMapStub; 042import org.jdrupes.vmoperator.common.K8sV1StatefulSetStub; 043import org.jdrupes.vmoperator.common.VmDefinition; 044import org.jdrupes.vmoperator.common.VmDefinitionStub; 045import org.jdrupes.vmoperator.common.VmDefinitions; 046import org.jdrupes.vmoperator.common.VmExtraData; 047import static org.jdrupes.vmoperator.manager.Constants.APP_NAME; 048import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME; 049import org.jdrupes.vmoperator.manager.events.ChannelManager; 050import org.jdrupes.vmoperator.manager.events.ModifyVm; 051import org.jdrupes.vmoperator.manager.events.PodChanged; 052import org.jdrupes.vmoperator.manager.events.UpdateAssignment; 053import org.jdrupes.vmoperator.manager.events.VmChannel; 054import org.jdrupes.vmoperator.manager.events.VmResourceChanged; 055import org.jdrupes.vmoperator.util.GsonPtr; 056import org.jgrapes.core.Channel; 057import org.jgrapes.core.Event; 058import org.jgrapes.core.EventPipeline; 059import org.jgrapes.core.annotation.Handler; 060 061/** 062 * Watches for changes of VM definitions. When a VM definition (CR) 063 * becomes known, is is registered with a {@link ChannelManager} and thus 064 * gets an associated {@link VmChannel} and an associated 065 * {@link EventPipeline}. 066 * 067 * The {@link EventPipeline} is used for submitting an action that processes 068 * the change data from kubernetes, eventually transforming it to a 069 * {@link VmResourceChanged} event that is handled by another 070 * {@link EventPipeline} associated with the {@link VmChannel}. This 071 * event pipeline should be used for all events related to changes of 072 * a particular VM. 073 */ 074public class VmMonitor extends 075 AbstractMonitor<VmDefinition, VmDefinitions, VmChannel> { 076 077 private final ChannelManager<String, VmChannel, 078 EventPipeline> channelManager; 079 080 /** 081 * Instantiates a new VM definition watcher. 082 * 083 * @param componentChannel the component channel 084 * @param channelManager the channel manager 085 */ 086 public VmMonitor(Channel componentChannel, 087 ChannelManager<String, VmChannel, EventPipeline> channelManager) { 088 super(componentChannel, VmDefinition.class, 089 VmDefinitions.class); 090 this.channelManager = channelManager; 091 } 092 093 @Override 094 protected void prepareMonitoring() throws IOException, ApiException { 095 client(new K8sClient()); 096 097 // Get all our API versions 098 var ctx = K8s.context(client(), Crd.GROUP, "", Crd.KIND_VM); 099 if (ctx.isEmpty()) { 100 logger.severe(() -> "Cannot get CRD context."); 101 return; 102 } 103 context(ctx.get()); 104 105 // Remove left over resources 106 purge(); 107 } 108 109 private void purge() throws ApiException { 110 // Get existing CRs (VMs) 111 var known = K8sDynamicStub.list(client(), context(), namespace()) 112 .stream().map(stub -> stub.name()).collect(Collectors.toSet()); 113 ListOptions opts = new ListOptions(); 114 opts.setLabelSelector( 115 "app.kubernetes.io/managed-by=" + VM_OP_NAME + "," 116 + "app.kubernetes.io/name=" + APP_NAME); 117 for (var context : Set.of(K8sV1StatefulSetStub.CONTEXT, 118 K8sV1ConfigMapStub.CONTEXT)) { 119 for (var resStub : K8sDynamicStub.list(client(), context, 120 namespace(), opts)) { 121 String instance = resStub.model() 122 .map(m -> m.metadata().getName()).orElse("(unknown)"); 123 if (!known.contains(instance)) { 124 resStub.delete(); 125 } 126 } 127 } 128 } 129 130 @Override 131 protected void handleChange(K8sClient client, 132 Watch.Response<VmDefinition> response) { 133 var name = response.object.getMetadata().getName(); 134 135 // Process the response data on a VM specific pipeline to 136 // increase concurrency when e.g. starting many VMs. 137 var preparing = channelManager.associated(name) 138 .orElseGet(() -> newEventPipeline()); 139 preparing.submit("VmChange[" + name + "]", 140 () -> processChange(client, response, preparing)); 141 } 142 143 private void processChange(K8sClient client, 144 Watch.Response<VmDefinition> response, EventPipeline preparing) { 145 // Get full definition and associate with channel as backup 146 var vmDef = response.object; 147 if (vmDef.data() == null) { 148 // ADDED event does not provide data, see 149 // https://github.com/kubernetes-client/java/issues/3215 150 vmDef = getModel(client, vmDef); 151 } 152 var name = response.object.getMetadata().getName(); 153 var channel = channelManager.channel(name) 154 .orElseGet(() -> channelManager.createChannel(name)); 155 if (vmDef.data() != null) { 156 // New data, augment and save 157 addExtraData(vmDef, channel.vmDefinition()); 158 channel.setVmDefinition(vmDef); 159 } else { 160 // Reuse cached (e.g. if deleted) 161 vmDef = channel.vmDefinition(); 162 } 163 if (vmDef == null) { 164 logger.warning(() -> "Cannot get defintion for " 165 + response.object.getMetadata()); 166 return; 167 } 168 channelManager.put(name, channel, preparing); 169 170 // Create and fire changed event. Remove channel from channel 171 // manager on completion. 172 VmResourceChanged chgEvt 173 = new VmResourceChanged(ResponseType.valueOf(response.type), vmDef, 174 channel.setGeneration(response.object.getMetadata() 175 .getGeneration()), 176 false); 177 if (ResponseType.valueOf(response.type) == ResponseType.DELETED) { 178 chgEvt = Event.onCompletion(chgEvt, 179 e -> channelManager.remove(e.vmDefinition().name())); 180 } 181 channel.fire(chgEvt); 182 } 183 184 private VmDefinition getModel(K8sClient client, VmDefinition vmDef) { 185 try { 186 return VmDefinitionStub.get(client, context(), namespace(), 187 vmDef.metadata().getName()).model().orElse(null); 188 } catch (ApiException e) { 189 return null; 190 } 191 } 192 193 private void addExtraData(VmDefinition vmDef, VmDefinition prevState) { 194 var extra = new VmExtraData(vmDef); 195 var prevExtra = Optional.ofNullable(prevState).map(VmDefinition::extra); 196 197 // Maintain (or initialize) the resetCount 198 extra.resetCount(prevExtra.map(VmExtraData::resetCount).orElse(0L)); 199 200 // Maintain node info 201 prevExtra 202 .ifPresent(e -> extra.nodeInfo(e.nodeName(), e.nodeAddresses())); 203 } 204 205 /** 206 * On pod changed. 207 * 208 * @param event the event 209 * @param channel the channel 210 */ 211 @Handler 212 public void onPodChanged(PodChanged event, VmChannel channel) { 213 var vmDef = channel.vmDefinition(); 214 215 // Make sure that this is properly sync'd with VM CR changes. 216 channelManager.associated(vmDef.name()) 217 .orElseGet(() -> activeEventPipeline()) 218 .submit("NodeInfo[" + vmDef.name() + "]", 219 () -> { 220 updateNodeInfo(event, vmDef); 221 channel.fire(new VmResourceChanged(ResponseType.MODIFIED, 222 vmDef, false, true)); 223 }); 224 } 225 226 private void updateNodeInfo(PodChanged event, VmDefinition vmDef) { 227 var extra = vmDef.extra(); 228 if (event.type() == ResponseType.DELETED) { 229 // The status of a deleted pod is the status before deletion, 230 // i.e. the node info is still cached and must be removed. 231 extra.nodeInfo("", Collections.emptyList()); 232 return; 233 } 234 235 // Get current node info from pod 236 var pod = event.pod(); 237 var nodeName = Optional 238 .ofNullable(pod.getSpec().getNodeName()).orElse(""); 239 logger.finer(() -> "Adding node name " + nodeName 240 + " to VM info for " + vmDef.name()); 241 var addrs = new ArrayList<String>(); 242 Optional.ofNullable(pod.getStatus().getPodIPs()) 243 .orElse(Collections.emptyList()).stream() 244 .map(ip -> ip.getIp()).forEach(addrs::add); 245 logger.finer(() -> "Adding node addresses " + addrs 246 + " to VM info for " + vmDef.name()); 247 extra.nodeInfo(nodeName, addrs); 248 } 249 250 /** 251 * On modify vm. 252 * 253 * @param event the event 254 * @throws ApiException the api exception 255 * @throws IOException Signals that an I/O exception has occurred. 256 */ 257 @Handler 258 public void onModifyVm(ModifyVm event, VmChannel channel) 259 throws ApiException, IOException { 260 patchVmDef(channel.client(), event.name(), "spec/vm/" + event.path(), 261 event.value()); 262 } 263 264 private void patchVmDef(K8sClient client, String name, String path, 265 Object value) throws ApiException, IOException { 266 var vmStub = K8sDynamicStub.get(client, 267 new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM), namespace(), 268 name); 269 270 // Patch running 271 String valueAsText = value instanceof String 272 ? "\"" + value + "\"" 273 : value.toString(); 274 var res = vmStub.patch(V1Patch.PATCH_FORMAT_JSON_PATCH, 275 new V1Patch("[{\"op\": \"replace\", \"path\": \"/" 276 + path + "\", \"value\": " + valueAsText + "}]"), 277 client.defaultPatchOptions()); 278 if (!res.isPresent()) { 279 logger.warning( 280 () -> "Cannot patch definition for Vm " + vmStub.name()); 281 } 282 } 283 284 /** 285 * Attempt to Update the assignment information in the status of the 286 * VM CR. Returns true if successful. The handler does not attempt 287 * retries, because in case of failure it will be necessary to 288 * re-evaluate the chosen VM. 289 * 290 * @param event the event 291 * @param channel the channel 292 * @throws ApiException the api exception 293 */ 294 @Handler 295 public void onUpdatedAssignment(UpdateAssignment event, VmChannel channel) 296 throws ApiException { 297 try { 298 var vmDef = channel.vmDefinition(); 299 var vmStub = VmDefinitionStub.get(channel.client(), 300 new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM), 301 vmDef.namespace(), vmDef.name()); 302 if (vmStub.updateStatus(vmDef, from -> { 303 JsonObject status = from.statusJson(); 304 if (event.toUser() == null) { 305 ((JsonObject) GsonPtr.to(status).get()) 306 .remove(Status.ASSIGNMENT); 307 } else { 308 var assignment = GsonPtr.to(status).to(Status.ASSIGNMENT); 309 assignment.set("pool", event.fromPool().name()); 310 assignment.set("user", event.toUser()); 311 assignment.set("lastUsed", Instant.now().toString()); 312 } 313 return status; 314 }).isPresent()) { 315 event.setResult(true); 316 } 317 } catch (ApiException e) { 318 // Log exceptions except for conflict, which can be expected 319 if (HttpURLConnection.HTTP_CONFLICT != e.getCode()) { 320 throw e; 321 } 322 } 323 event.setResult(false); 324 } 325 326}