001/* 002 * VM-Operator 003 * Copyright (C) 2023,2025 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.runner.qemu; 020 021import com.fasterxml.jackson.databind.ObjectMapper; 022import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; 023import com.google.gson.Gson; 024import com.google.gson.JsonObject; 025import io.kubernetes.client.apimachinery.GroupVersionKind; 026import io.kubernetes.client.custom.Quantity; 027import io.kubernetes.client.custom.Quantity.Format; 028import io.kubernetes.client.custom.V1Patch; 029import io.kubernetes.client.openapi.ApiException; 030import io.kubernetes.client.openapi.JSON; 031import io.kubernetes.client.openapi.models.EventsV1Event; 032import java.io.IOException; 033import java.math.BigDecimal; 034import java.math.BigInteger; 035import java.time.Instant; 036import java.util.Optional; 037import java.util.logging.Level; 038import static org.jdrupes.vmoperator.common.Constants.APP_NAME; 039import org.jdrupes.vmoperator.common.Constants.Crd; 040import org.jdrupes.vmoperator.common.Constants.Status; 041import org.jdrupes.vmoperator.common.Constants.Status.Condition; 042import org.jdrupes.vmoperator.common.Constants.Status.Condition.Reason; 043import org.jdrupes.vmoperator.common.K8s; 044import org.jdrupes.vmoperator.common.VmDefinition; 045import org.jdrupes.vmoperator.common.VmDefinitionStub; 046import org.jdrupes.vmoperator.runner.qemu.events.BalloonChangeEvent; 047import org.jdrupes.vmoperator.runner.qemu.events.ConfigureQemu; 048import org.jdrupes.vmoperator.runner.qemu.events.DisplayPasswordChanged; 049import org.jdrupes.vmoperator.runner.qemu.events.Exit; 050import org.jdrupes.vmoperator.runner.qemu.events.HotpluggableCpuStatus; 051import org.jdrupes.vmoperator.runner.qemu.events.OsinfoEvent; 052import org.jdrupes.vmoperator.runner.qemu.events.RunnerStateChange; 053import org.jdrupes.vmoperator.runner.qemu.events.RunnerStateChange.RunState; 054import org.jdrupes.vmoperator.runner.qemu.events.ShutdownEvent; 055import org.jdrupes.vmoperator.runner.qemu.events.VmopAgentConnected; 056import org.jdrupes.vmoperator.runner.qemu.events.VmopAgentLoggedIn; 057import org.jdrupes.vmoperator.runner.qemu.events.VmopAgentLoggedOut; 058import org.jdrupes.vmoperator.util.GsonPtr; 059import org.jgrapes.core.Channel; 060import org.jgrapes.core.Components; 061import org.jgrapes.core.Components.Timer; 062import org.jgrapes.core.annotation.Handler; 063import org.jgrapes.core.events.HandlingError; 064import org.jgrapes.core.events.Start; 065 066/** 067 * Updates the CR status. 068 */ 069@SuppressWarnings({ "PMD.CouplingBetweenObjects" }) 070public class StatusUpdater extends VmDefUpdater { 071 072 @SuppressWarnings("PMD.FieldNamingConventions") 073 private static final Gson gson = new JSON().getGson(); 074 @SuppressWarnings("PMD.FieldNamingConventions") 075 private static final ObjectMapper objectMapper 076 = new ObjectMapper().registerModule(new JavaTimeModule()); 077 078 private boolean guestShutdownStops; 079 private boolean shutdownByGuest; 080 private VmDefinitionStub vmStub; 081 private String loggedInUser; 082 private BigInteger lastRamValue; 083 private Instant lastRamChange; 084 private Timer balloonTimer; 085 private BigInteger targetRamValue; 086 087 /** 088 * Instantiates a new status updater. 089 * 090 * @param componentChannel the component channel 091 */ 092 public StatusUpdater(Channel componentChannel) { 093 super(componentChannel); 094 attach(new ConsoleTracker(componentChannel)); 095 } 096 097 /** 098 * On handling error. 099 * 100 * @param event the event 101 */ 102 @Handler(channels = Channel.class) 103 public void onHandlingError(HandlingError event) { 104 if (event.throwable() instanceof ApiException exc) { 105 logger.log(Level.WARNING, exc, 106 () -> "Problem accessing kubernetes: " + exc.getResponseBody()); 107 event.stop(); 108 } 109 } 110 111 /** 112 * Handle the start event. 113 * 114 * @param event the event 115 * @throws IOException 116 * @throws ApiException 117 */ 118 @Handler 119 public void onStart(Start event) { 120 if (namespace == null) { 121 return; 122 } 123 try { 124 vmStub = VmDefinitionStub.get(apiClient, 125 new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM), 126 namespace, vmName); 127 var vmDef = vmStub.model().orElse(null); 128 if (vmDef == null) { 129 return; 130 } 131 vmStub.updateStatus(from -> { 132 JsonObject status = from.statusJson(); 133 status.addProperty(Status.RUNNER_VERSION, Optional.ofNullable( 134 Runner.class.getPackage().getImplementationVersion()) 135 .orElse("(unknown)")); 136 status.remove(Status.LOGGED_IN_USER); 137 return status; 138 }); 139 } catch (ApiException e) { 140 logger.log(Level.SEVERE, e, 141 () -> "Cannot access VM object, terminating."); 142 event.cancel(true); 143 fire(new Exit(1)); 144 } 145 } 146 147 /** 148 * On runner configuration update. 149 * 150 * @param event the event 151 * @throws ApiException 152 */ 153 @Handler 154 public void onConfigureQemu(ConfigureQemu event) 155 throws ApiException { 156 guestShutdownStops = event.configuration().guestShutdownStops; 157 loggedInUser = event.configuration().vm.display.loggedInUser; 158 targetRamValue = event.configuration().vm.currentRam; 159 160 // Remainder applies only if we have a connection to k8s. 161 if (vmStub == null) { 162 return; 163 } 164 vmStub.updateStatus(from -> { 165 JsonObject status = from.statusJson(); 166 if (!event.configuration().hasDisplayPassword) { 167 status.addProperty(Status.DISPLAY_PASSWORD_SERIAL, -1); 168 } 169 status.getAsJsonArray("conditions").asList().stream() 170 .map(cond -> (JsonObject) cond) 171 .filter(cond -> Condition.RUNNING 172 .equals(cond.get("type").getAsString())) 173 .forEach(cond -> cond.addProperty("observedGeneration", 174 from.getMetadata().getGeneration())); 175 updateUserLoggedIn(from); 176 return status; 177 }); 178 } 179 180 /** 181 * On runner state changed. 182 * 183 * @param event the event 184 * @throws ApiException 185 */ 186 @Handler 187 @SuppressWarnings({ "PMD.AssignmentInOperand" }) 188 public void onRunnerStateChanged(RunnerStateChange event) 189 throws ApiException { 190 VmDefinition vmDef; 191 if (vmStub == null || (vmDef = vmStub.model().orElse(null)) == null) { 192 return; 193 } 194 vmStub.updateStatus(from -> { 195 boolean running = event.runState().vmRunning(); 196 updateCondition(vmDef, Condition.RUNNING, running, event.reason(), 197 event.message()); 198 JsonObject status = updateCondition(vmDef, Condition.BOOTED, 199 event.runState() == RunState.BOOTED, event.reason(), 200 event.message()); 201 if (event.runState() == RunState.STARTING) { 202 status.addProperty(Status.RAM, GsonPtr.to(from.data()) 203 .getAsString("spec", "vm", "maximumRam").orElse("0")); 204 status.addProperty(Status.CPUS, 1); 205 } else if (event.runState() == RunState.STOPPED) { 206 status.addProperty(Status.RAM, "0"); 207 status.addProperty(Status.CPUS, 0); 208 status.remove(Status.LOGGED_IN_USER); 209 } 210 211 if (!running) { 212 // In case console connection was still present 213 status.addProperty(Status.CONSOLE_CLIENT, ""); 214 updateCondition(from, Condition.CONSOLE_CONNECTED, false, 215 "VmStopped", 216 "The VM is not running"); 217 218 // In case we had an irregular shutdown 219 updateCondition(from, Condition.USER_LOGGED_IN, false, 220 "VmStopped", "The VM is not running"); 221 status.remove(Status.OSINFO); 222 updateCondition(vmDef, "VmopAgentConnected", false, "VmStopped", 223 "The VM is not running"); 224 } 225 return status; 226 }, vmDef); 227 228 // Maybe stop VM 229 if (event.runState() == RunState.TERMINATING && !event.failed() 230 && guestShutdownStops && shutdownByGuest) { 231 logger.info(() -> "Stopping VM because of shutdown by guest."); 232 var res = vmStub.patch(V1Patch.PATCH_FORMAT_JSON_PATCH, 233 new V1Patch("[{\"op\": \"replace\", \"path\": \"/spec/vm/state" 234 + "\", \"value\": \"Stopped\"}]"), 235 apiClient.defaultPatchOptions()); 236 if (!res.isPresent()) { 237 logger.warning( 238 () -> "Cannot patch pod annotations for: " + vmStub.name()); 239 } 240 } 241 242 // Log event 243 var evt = new EventsV1Event() 244 .reportingController(Crd.GROUP + "/" + APP_NAME) 245 .action("StatusUpdate").reason(event.reason()) 246 .note(event.message()); 247 K8s.createEvent(apiClient, vmDef, evt); 248 } 249 250 private void updateUserLoggedIn(VmDefinition from) { 251 if (loggedInUser == null) { 252 updateCondition(from, Condition.USER_LOGGED_IN, false, 253 Reason.NOT_REQUESTED, "No user to be logged in"); 254 return; 255 } 256 if (!from.conditionStatus(Condition.VMOP_AGENT).orElse(false)) { 257 updateCondition(from, Condition.USER_LOGGED_IN, false, 258 "VmopAgentDisconnected", "Waiting for VMOP agent to connect"); 259 return; 260 } 261 if (!from.fromStatus(Status.LOGGED_IN_USER).map(loggedInUser::equals) 262 .orElse(false)) { 263 updateCondition(from, Condition.USER_LOGGED_IN, false, 264 "Processing", "Waiting for user to be logged in"); 265 } 266 updateCondition(from, Condition.USER_LOGGED_IN, true, 267 Reason.LOGGED_IN, "User is logged in"); 268 } 269 270 /** 271 * Update the current RAM size in the status. Balloon changes happen 272 * more than once every second during changes. While this is nice 273 * to watch, this puts a heavy load on the system. Therefore we 274 * only update the status once every 15 seconds or when the target 275 * value is reached. 276 * 277 * @param event the event 278 * @throws ApiException 279 */ 280 @Handler 281 public void onBallonChange(BalloonChangeEvent event) throws ApiException { 282 if (vmStub == null) { 283 return; 284 } 285 Instant now = Instant.now(); 286 if (lastRamChange == null 287 || lastRamChange.isBefore(now.minusSeconds(15)) 288 || event.size().equals(targetRamValue)) { 289 if (balloonTimer != null) { 290 balloonTimer.cancel(); 291 balloonTimer = null; 292 } 293 lastRamChange = now; 294 lastRamValue = event.size(); 295 updateRam(); 296 return; 297 } 298 299 // Save for later processing and maybe start timer 300 lastRamChange = now; 301 lastRamValue = event.size(); 302 if (balloonTimer != null) { 303 return; 304 } 305 final var pipeline = activeEventPipeline(); 306 balloonTimer = Components.schedule(t -> { 307 pipeline.submit("Update RAM size", () -> { 308 try { 309 updateRam(); 310 } catch (ApiException e) { 311 logger.log(Level.WARNING, e, 312 () -> "Failed to update ram size: " + e.getMessage()); 313 } 314 balloonTimer = null; 315 }); 316 }, now.plusSeconds(15)); 317 } 318 319 private void updateRam() throws ApiException { 320 vmStub.updateStatus(from -> { 321 JsonObject status = from.statusJson(); 322 status.addProperty(Status.RAM, 323 new Quantity(new BigDecimal(lastRamValue), Format.BINARY_SI) 324 .toSuffixedString()); 325 return status; 326 }); 327 } 328 329 /** 330 * On ballon change. 331 * 332 * @param event the event 333 * @throws ApiException 334 */ 335 @Handler 336 public void onCpuChange(HotpluggableCpuStatus event) throws ApiException { 337 if (vmStub == null) { 338 return; 339 } 340 vmStub.updateStatus(from -> { 341 JsonObject status = from.statusJson(); 342 status.addProperty(Status.CPUS, event.usedCpus().size()); 343 return status; 344 }); 345 } 346 347 /** 348 * On ballon change. 349 * 350 * @param event the event 351 * @throws ApiException 352 */ 353 @Handler 354 public void onDisplayPasswordChanged(DisplayPasswordChanged event) 355 throws ApiException { 356 if (vmStub == null) { 357 return; 358 } 359 vmStub.updateStatus(from -> { 360 JsonObject status = from.statusJson(); 361 status.addProperty(Status.DISPLAY_PASSWORD_SERIAL, 362 status.get(Status.DISPLAY_PASSWORD_SERIAL).getAsLong() + 1); 363 return status; 364 }); 365 } 366 367 /** 368 * On shutdown. 369 * 370 * @param event the event 371 * @throws ApiException the api exception 372 */ 373 @Handler 374 public void onShutdown(ShutdownEvent event) throws ApiException { 375 shutdownByGuest = event.byGuest(); 376 } 377 378 /** 379 * On osinfo. 380 * 381 * @param event the event 382 * @throws ApiException 383 */ 384 @Handler 385 public void onOsinfo(OsinfoEvent event) throws ApiException { 386 if (vmStub == null) { 387 return; 388 } 389 var asGson = gson.toJsonTree( 390 objectMapper.convertValue(event.osinfo(), Object.class)); 391 vmStub.updateStatus(from -> { 392 JsonObject status = from.statusJson(); 393 status.add(Status.OSINFO, asGson); 394 return status; 395 }); 396 397 } 398 399 /** 400 * @param event the event 401 * @throws ApiException 402 */ 403 @Handler 404 @SuppressWarnings("PMD.AssignmentInOperand") 405 public void onVmopAgentConnected(VmopAgentConnected event) 406 throws ApiException { 407 VmDefinition vmDef; 408 if (vmStub == null || (vmDef = vmStub.model().orElse(null)) == null) { 409 return; 410 } 411 vmStub.updateStatus(from -> { 412 var status = updateCondition(vmDef, "VmopAgentConnected", 413 true, "VmopAgentStarted", "The VM operator agent is running"); 414 updateUserLoggedIn(from); 415 return status; 416 }, vmDef); 417 } 418 419 /** 420 * @param event the event 421 * @throws ApiException 422 */ 423 @Handler 424 public void onVmopAgentLoggedIn(VmopAgentLoggedIn event) 425 throws ApiException { 426 vmStub.updateStatus(from -> { 427 JsonObject status = from.statusJson(); 428 status.addProperty(Status.LOGGED_IN_USER, 429 event.triggering().user()); 430 updateUserLoggedIn(from); 431 return status; 432 }); 433 } 434 435 /** 436 * @param event the event 437 * @throws ApiException 438 */ 439 @Handler 440 public void onVmopAgentLoggedOut(VmopAgentLoggedOut event) 441 throws ApiException { 442 vmStub.updateStatus(from -> { 443 JsonObject status = from.statusJson(); 444 status.remove(Status.LOGGED_IN_USER); 445 updateUserLoggedIn(from); 446 return status; 447 }); 448 } 449}