001/* 002 * VM-Operator 003 * Copyright (C) 2023,2025 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.runner.qemu; 020 021import com.fasterxml.jackson.databind.ObjectMapper; 022import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; 023import com.google.gson.Gson; 024import com.google.gson.JsonObject; 025import io.kubernetes.client.apimachinery.GroupVersionKind; 026import io.kubernetes.client.custom.Quantity; 027import io.kubernetes.client.custom.Quantity.Format; 028import io.kubernetes.client.custom.V1Patch; 029import io.kubernetes.client.openapi.ApiException; 030import io.kubernetes.client.openapi.JSON; 031import io.kubernetes.client.openapi.models.EventsV1Event; 032import java.io.IOException; 033import java.math.BigDecimal; 034import java.math.BigInteger; 035import java.time.Instant; 036import java.util.Optional; 037import java.util.logging.Level; 038import static org.jdrupes.vmoperator.common.Constants.APP_NAME; 039import org.jdrupes.vmoperator.common.Constants.Crd; 040import org.jdrupes.vmoperator.common.Constants.Status; 041import org.jdrupes.vmoperator.common.Constants.Status.Condition; 042import org.jdrupes.vmoperator.common.Constants.Status.Condition.Reason; 043import org.jdrupes.vmoperator.common.K8s; 044import org.jdrupes.vmoperator.common.VmDefinition; 045import org.jdrupes.vmoperator.common.VmDefinitionStub; 046import org.jdrupes.vmoperator.runner.qemu.events.BalloonChangeEvent; 047import org.jdrupes.vmoperator.runner.qemu.events.ConfigureQemu; 048import org.jdrupes.vmoperator.runner.qemu.events.DisplayPasswordChanged; 049import org.jdrupes.vmoperator.runner.qemu.events.Exit; 050import org.jdrupes.vmoperator.runner.qemu.events.HotpluggableCpuStatus; 051import org.jdrupes.vmoperator.runner.qemu.events.OsinfoEvent; 052import org.jdrupes.vmoperator.runner.qemu.events.RunnerStateChange; 053import org.jdrupes.vmoperator.runner.qemu.events.RunnerStateChange.RunState; 054import org.jdrupes.vmoperator.runner.qemu.events.ShutdownEvent; 055import org.jdrupes.vmoperator.runner.qemu.events.VmopAgentConnected; 056import org.jdrupes.vmoperator.runner.qemu.events.VmopAgentLoggedIn; 057import org.jdrupes.vmoperator.runner.qemu.events.VmopAgentLoggedOut; 058import org.jdrupes.vmoperator.util.GsonPtr; 059import org.jgrapes.core.Channel; 060import org.jgrapes.core.Components; 061import org.jgrapes.core.Components.Timer; 062import org.jgrapes.core.annotation.Handler; 063import org.jgrapes.core.events.HandlingError; 064import org.jgrapes.core.events.Start; 065 066/** 067 * Updates the CR status. 068 */ 069@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", 070 "PMD.CouplingBetweenObjects" }) 071public class StatusUpdater extends VmDefUpdater { 072 073 @SuppressWarnings("PMD.FieldNamingConventions") 074 private static final Gson gson = new JSON().getGson(); 075 @SuppressWarnings("PMD.FieldNamingConventions") 076 private static final ObjectMapper objectMapper 077 = new ObjectMapper().registerModule(new JavaTimeModule()); 078 079 private boolean guestShutdownStops; 080 private boolean shutdownByGuest; 081 private VmDefinitionStub vmStub; 082 private String loggedInUser; 083 private BigInteger lastRamValue; 084 private Instant lastRamChange; 085 private Timer balloonTimer; 086 private BigInteger targetRamValue; 087 088 /** 089 * Instantiates a new status updater. 090 * 091 * @param componentChannel the component channel 092 */ 093 @SuppressWarnings("PMD.ConstructorCallsOverridableMethod") 094 public StatusUpdater(Channel componentChannel) { 095 super(componentChannel); 096 attach(new ConsoleTracker(componentChannel)); 097 } 098 099 /** 100 * On handling error. 101 * 102 * @param event the event 103 */ 104 @Handler(channels = Channel.class) 105 public void onHandlingError(HandlingError event) { 106 if (event.throwable() instanceof ApiException exc) { 107 logger.log(Level.WARNING, exc, 108 () -> "Problem accessing kubernetes: " + exc.getResponseBody()); 109 event.stop(); 110 } 111 } 112 113 /** 114 * Handle the start event. 115 * 116 * @param event the event 117 * @throws IOException 118 * @throws ApiException 119 */ 120 @Handler 121 public void onStart(Start event) { 122 if (namespace == null) { 123 return; 124 } 125 try { 126 vmStub = VmDefinitionStub.get(apiClient, 127 new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM), 128 namespace, vmName); 129 var vmDef = vmStub.model().orElse(null); 130 if (vmDef == null) { 131 return; 132 } 133 vmStub.updateStatus(from -> { 134 JsonObject status = from.statusJson(); 135 status.addProperty(Status.RUNNER_VERSION, Optional.ofNullable( 136 Runner.class.getPackage().getImplementationVersion()) 137 .orElse("(unknown)")); 138 status.remove(Status.LOGGED_IN_USER); 139 return status; 140 }); 141 } catch (ApiException e) { 142 logger.log(Level.SEVERE, e, 143 () -> "Cannot access VM object, terminating."); 144 event.cancel(true); 145 fire(new Exit(1)); 146 } 147 } 148 149 /** 150 * On runner configuration update. 151 * 152 * @param event the event 153 * @throws ApiException 154 */ 155 @Handler 156 @SuppressWarnings("PMD.AvoidDuplicateLiterals") 157 public void onConfigureQemu(ConfigureQemu event) 158 throws ApiException { 159 guestShutdownStops = event.configuration().guestShutdownStops; 160 loggedInUser = event.configuration().vm.display.loggedInUser; 161 targetRamValue = event.configuration().vm.currentRam; 162 163 // Remainder applies only if we have a connection to k8s. 164 if (vmStub == null) { 165 return; 166 } 167 vmStub.updateStatus(from -> { 168 JsonObject status = from.statusJson(); 169 if (!event.configuration().hasDisplayPassword) { 170 status.addProperty(Status.DISPLAY_PASSWORD_SERIAL, -1); 171 } 172 status.getAsJsonArray("conditions").asList().stream() 173 .map(cond -> (JsonObject) cond) 174 .filter(cond -> Condition.RUNNING 175 .equals(cond.get("type").getAsString())) 176 .forEach(cond -> cond.addProperty("observedGeneration", 177 from.getMetadata().getGeneration())); 178 updateUserLoggedIn(from); 179 return status; 180 }); 181 } 182 183 /** 184 * On runner state changed. 185 * 186 * @param event the event 187 * @throws ApiException 188 */ 189 @Handler 190 @SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition", 191 "PMD.AssignmentInOperand", "PMD.AvoidDuplicateLiterals" }) 192 public void onRunnerStateChanged(RunnerStateChange event) 193 throws ApiException { 194 VmDefinition vmDef; 195 if (vmStub == null || (vmDef = vmStub.model().orElse(null)) == null) { 196 return; 197 } 198 vmStub.updateStatus(from -> { 199 boolean running = event.runState().vmRunning(); 200 updateCondition(vmDef, Condition.RUNNING, running, event.reason(), 201 event.message()); 202 JsonObject status = updateCondition(vmDef, Condition.BOOTED, 203 event.runState() == RunState.BOOTED, event.reason(), 204 event.message()); 205 if (event.runState() == RunState.STARTING) { 206 status.addProperty(Status.RAM, GsonPtr.to(from.data()) 207 .getAsString("spec", "vm", "maximumRam").orElse("0")); 208 status.addProperty(Status.CPUS, 1); 209 } else if (event.runState() == RunState.STOPPED) { 210 status.addProperty(Status.RAM, "0"); 211 status.addProperty(Status.CPUS, 0); 212 status.remove(Status.LOGGED_IN_USER); 213 } 214 215 if (!running) { 216 // In case console connection was still present 217 status.addProperty(Status.CONSOLE_CLIENT, ""); 218 updateCondition(from, Condition.CONSOLE_CONNECTED, false, 219 "VmStopped", 220 "The VM is not running"); 221 222 // In case we had an irregular shutdown 223 updateCondition(from, Condition.USER_LOGGED_IN, false, 224 "VmStopped", "The VM is not running"); 225 status.remove(Status.OSINFO); 226 updateCondition(vmDef, "VmopAgentConnected", false, "VmStopped", 227 "The VM is not running"); 228 } 229 return status; 230 }, vmDef); 231 232 // Maybe stop VM 233 if (event.runState() == RunState.TERMINATING && !event.failed() 234 && guestShutdownStops && shutdownByGuest) { 235 logger.info(() -> "Stopping VM because of shutdown by guest."); 236 var res = vmStub.patch(V1Patch.PATCH_FORMAT_JSON_PATCH, 237 new V1Patch("[{\"op\": \"replace\", \"path\": \"/spec/vm/state" 238 + "\", \"value\": \"Stopped\"}]"), 239 apiClient.defaultPatchOptions()); 240 if (!res.isPresent()) { 241 logger.warning( 242 () -> "Cannot patch pod annotations for: " + vmStub.name()); 243 } 244 } 245 246 // Log event 247 var evt = new EventsV1Event() 248 .reportingController(Crd.GROUP + "/" + APP_NAME) 249 .action("StatusUpdate").reason(event.reason()) 250 .note(event.message()); 251 K8s.createEvent(apiClient, vmDef, evt); 252 } 253 254 private void updateUserLoggedIn(VmDefinition from) { 255 if (loggedInUser == null) { 256 updateCondition(from, Condition.USER_LOGGED_IN, false, 257 Reason.NOT_REQUESTED, "No user to be logged in"); 258 return; 259 } 260 if (!from.conditionStatus(Condition.VMOP_AGENT).orElse(false)) { 261 updateCondition(from, Condition.USER_LOGGED_IN, false, 262 "VmopAgentDisconnected", "Waiting for VMOP agent to connect"); 263 return; 264 } 265 if (!from.fromStatus(Status.LOGGED_IN_USER).map(loggedInUser::equals) 266 .orElse(false)) { 267 updateCondition(from, Condition.USER_LOGGED_IN, false, 268 "Processing", "Waiting for user to be logged in"); 269 } 270 updateCondition(from, Condition.USER_LOGGED_IN, true, 271 Reason.LOGGED_IN, "User is logged in"); 272 } 273 274 /** 275 * Update the current RAM size in the status. Balloon changes happen 276 * more than once every second during changes. While this is nice 277 * to watch, this puts a heavy load on the system. Therefore we 278 * only update the status once every 15 seconds or when the target 279 * value is reached. 280 * 281 * @param event the event 282 * @throws ApiException 283 */ 284 @Handler 285 public void onBallonChange(BalloonChangeEvent event) throws ApiException { 286 if (vmStub == null) { 287 return; 288 } 289 Instant now = Instant.now(); 290 if (lastRamChange == null 291 || lastRamChange.isBefore(now.minusSeconds(15)) 292 || event.size().equals(targetRamValue)) { 293 if (balloonTimer != null) { 294 balloonTimer.cancel(); 295 balloonTimer = null; 296 } 297 lastRamChange = now; 298 lastRamValue = event.size(); 299 updateRam(); 300 return; 301 } 302 303 // Save for later processing and maybe start timer 304 lastRamChange = now; 305 lastRamValue = event.size(); 306 if (balloonTimer != null) { 307 return; 308 } 309 final var pipeline = activeEventPipeline(); 310 balloonTimer = Components.schedule(t -> { 311 pipeline.submit("Update RAM size", () -> { 312 try { 313 updateRam(); 314 } catch (ApiException e) { 315 logger.log(Level.WARNING, e, 316 () -> "Failed to update ram size: " + e.getMessage()); 317 } 318 balloonTimer = null; 319 }); 320 }, now.plusSeconds(15)); 321 } 322 323 private void updateRam() throws ApiException { 324 vmStub.updateStatus(from -> { 325 JsonObject status = from.statusJson(); 326 status.addProperty(Status.RAM, 327 new Quantity(new BigDecimal(lastRamValue), Format.BINARY_SI) 328 .toSuffixedString()); 329 return status; 330 }); 331 } 332 333 /** 334 * On ballon change. 335 * 336 * @param event the event 337 * @throws ApiException 338 */ 339 @Handler 340 public void onCpuChange(HotpluggableCpuStatus event) throws ApiException { 341 if (vmStub == null) { 342 return; 343 } 344 vmStub.updateStatus(from -> { 345 JsonObject status = from.statusJson(); 346 status.addProperty(Status.CPUS, event.usedCpus().size()); 347 return status; 348 }); 349 } 350 351 /** 352 * On ballon change. 353 * 354 * @param event the event 355 * @throws ApiException 356 */ 357 @Handler 358 public void onDisplayPasswordChanged(DisplayPasswordChanged event) 359 throws ApiException { 360 if (vmStub == null) { 361 return; 362 } 363 vmStub.updateStatus(from -> { 364 JsonObject status = from.statusJson(); 365 status.addProperty(Status.DISPLAY_PASSWORD_SERIAL, 366 status.get(Status.DISPLAY_PASSWORD_SERIAL).getAsLong() + 1); 367 return status; 368 }); 369 } 370 371 /** 372 * On shutdown. 373 * 374 * @param event the event 375 * @throws ApiException the api exception 376 */ 377 @Handler 378 public void onShutdown(ShutdownEvent event) throws ApiException { 379 shutdownByGuest = event.byGuest(); 380 } 381 382 /** 383 * On osinfo. 384 * 385 * @param event the event 386 * @throws ApiException 387 */ 388 @Handler 389 public void onOsinfo(OsinfoEvent event) throws ApiException { 390 if (vmStub == null) { 391 return; 392 } 393 var asGson = gson.toJsonTree( 394 objectMapper.convertValue(event.osinfo(), Object.class)); 395 vmStub.updateStatus(from -> { 396 JsonObject status = from.statusJson(); 397 status.add(Status.OSINFO, asGson); 398 return status; 399 }); 400 401 } 402 403 /** 404 * @param event the event 405 * @throws ApiException 406 */ 407 @Handler 408 @SuppressWarnings("PMD.AssignmentInOperand") 409 public void onVmopAgentConnected(VmopAgentConnected event) 410 throws ApiException { 411 VmDefinition vmDef; 412 if (vmStub == null || (vmDef = vmStub.model().orElse(null)) == null) { 413 return; 414 } 415 vmStub.updateStatus(from -> { 416 var status = updateCondition(vmDef, "VmopAgentConnected", 417 true, "VmopAgentStarted", "The VM operator agent is running"); 418 updateUserLoggedIn(from); 419 return status; 420 }, vmDef); 421 } 422 423 /** 424 * @param event the event 425 * @throws ApiException 426 */ 427 @Handler 428 @SuppressWarnings("PMD.AssignmentInOperand") 429 public void onVmopAgentLoggedIn(VmopAgentLoggedIn event) 430 throws ApiException { 431 vmStub.updateStatus(from -> { 432 JsonObject status = from.statusJson(); 433 status.addProperty(Status.LOGGED_IN_USER, 434 event.triggering().user()); 435 updateUserLoggedIn(from); 436 return status; 437 }); 438 } 439 440 /** 441 * @param event the event 442 * @throws ApiException 443 */ 444 @Handler 445 @SuppressWarnings("PMD.AssignmentInOperand") 446 public void onVmopAgentLoggedOut(VmopAgentLoggedOut event) 447 throws ApiException { 448 vmStub.updateStatus(from -> { 449 JsonObject status = from.statusJson(); 450 status.remove(Status.LOGGED_IN_USER); 451 updateUserLoggedIn(from); 452 return status; 453 }); 454 } 455}