001/*
002 * VM-Operator
003 * Copyright (C) 2023,2025 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.runner.qemu;
020
021import com.fasterxml.jackson.databind.ObjectMapper;
022import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
023import com.google.gson.Gson;
024import com.google.gson.JsonObject;
025import io.kubernetes.client.apimachinery.GroupVersionKind;
026import io.kubernetes.client.custom.Quantity;
027import io.kubernetes.client.custom.Quantity.Format;
028import io.kubernetes.client.custom.V1Patch;
029import io.kubernetes.client.openapi.ApiException;
030import io.kubernetes.client.openapi.JSON;
031import io.kubernetes.client.openapi.models.EventsV1Event;
032import java.io.IOException;
033import java.math.BigDecimal;
034import java.math.BigInteger;
035import java.time.Instant;
036import java.util.Optional;
037import java.util.logging.Level;
038import static org.jdrupes.vmoperator.common.Constants.APP_NAME;
039import org.jdrupes.vmoperator.common.Constants.Crd;
040import org.jdrupes.vmoperator.common.Constants.Status;
041import org.jdrupes.vmoperator.common.Constants.Status.Condition;
042import org.jdrupes.vmoperator.common.Constants.Status.Condition.Reason;
043import org.jdrupes.vmoperator.common.K8s;
044import org.jdrupes.vmoperator.common.VmDefinition;
045import org.jdrupes.vmoperator.common.VmDefinitionStub;
046import org.jdrupes.vmoperator.runner.qemu.events.BalloonChangeEvent;
047import org.jdrupes.vmoperator.runner.qemu.events.ConfigureQemu;
048import org.jdrupes.vmoperator.runner.qemu.events.DisplayPasswordChanged;
049import org.jdrupes.vmoperator.runner.qemu.events.Exit;
050import org.jdrupes.vmoperator.runner.qemu.events.HotpluggableCpuStatus;
051import org.jdrupes.vmoperator.runner.qemu.events.OsinfoEvent;
052import org.jdrupes.vmoperator.runner.qemu.events.RunnerStateChange;
053import org.jdrupes.vmoperator.runner.qemu.events.RunnerStateChange.RunState;
054import org.jdrupes.vmoperator.runner.qemu.events.ShutdownEvent;
055import org.jdrupes.vmoperator.runner.qemu.events.VmopAgentConnected;
056import org.jdrupes.vmoperator.runner.qemu.events.VmopAgentLoggedIn;
057import org.jdrupes.vmoperator.runner.qemu.events.VmopAgentLoggedOut;
058import org.jdrupes.vmoperator.util.GsonPtr;
059import org.jgrapes.core.Channel;
060import org.jgrapes.core.Components;
061import org.jgrapes.core.Components.Timer;
062import org.jgrapes.core.annotation.Handler;
063import org.jgrapes.core.events.HandlingError;
064import org.jgrapes.core.events.Start;
065
066/**
067 * Updates the CR status.
068 */
069@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
070    "PMD.CouplingBetweenObjects" })
071public class StatusUpdater extends VmDefUpdater {
072
073    @SuppressWarnings("PMD.FieldNamingConventions")
074    private static final Gson gson = new JSON().getGson();
075    @SuppressWarnings("PMD.FieldNamingConventions")
076    private static final ObjectMapper objectMapper
077        = new ObjectMapper().registerModule(new JavaTimeModule());
078
079    private boolean guestShutdownStops;
080    private boolean shutdownByGuest;
081    private VmDefinitionStub vmStub;
082    private String loggedInUser;
083    private BigInteger lastRamValue;
084    private Instant lastRamChange;
085    private Timer balloonTimer;
086    private BigInteger targetRamValue;
087
088    /**
089     * Instantiates a new status updater.
090     *
091     * @param componentChannel the component channel
092     */
093    @SuppressWarnings("PMD.ConstructorCallsOverridableMethod")
094    public StatusUpdater(Channel componentChannel) {
095        super(componentChannel);
096        attach(new ConsoleTracker(componentChannel));
097    }
098
099    /**
100     * On handling error.
101     *
102     * @param event the event
103     */
104    @Handler(channels = Channel.class)
105    public void onHandlingError(HandlingError event) {
106        if (event.throwable() instanceof ApiException exc) {
107            logger.log(Level.WARNING, exc,
108                () -> "Problem accessing kubernetes: " + exc.getResponseBody());
109            event.stop();
110        }
111    }
112
113    /**
114     * Handle the start event.
115     *
116     * @param event the event
117     * @throws IOException 
118     * @throws ApiException 
119     */
120    @Handler
121    public void onStart(Start event) {
122        if (namespace == null) {
123            return;
124        }
125        try {
126            vmStub = VmDefinitionStub.get(apiClient,
127                new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM),
128                namespace, vmName);
129            var vmDef = vmStub.model().orElse(null);
130            if (vmDef == null) {
131                return;
132            }
133            vmStub.updateStatus(from -> {
134                JsonObject status = from.statusJson();
135                status.addProperty(Status.RUNNER_VERSION, Optional.ofNullable(
136                    Runner.class.getPackage().getImplementationVersion())
137                    .orElse("(unknown)"));
138                status.remove(Status.LOGGED_IN_USER);
139                return status;
140            });
141        } catch (ApiException e) {
142            logger.log(Level.SEVERE, e,
143                () -> "Cannot access VM object, terminating.");
144            event.cancel(true);
145            fire(new Exit(1));
146        }
147    }
148
149    /**
150     * On runner configuration update.
151     *
152     * @param event the event
153     * @throws ApiException 
154     */
155    @Handler
156    @SuppressWarnings("PMD.AvoidDuplicateLiterals")
157    public void onConfigureQemu(ConfigureQemu event)
158            throws ApiException {
159        guestShutdownStops = event.configuration().guestShutdownStops;
160        loggedInUser = event.configuration().vm.display.loggedInUser;
161        targetRamValue = event.configuration().vm.currentRam;
162
163        // Remainder applies only if we have a connection to k8s.
164        if (vmStub == null) {
165            return;
166        }
167        vmStub.updateStatus(from -> {
168            JsonObject status = from.statusJson();
169            if (!event.configuration().hasDisplayPassword) {
170                status.addProperty(Status.DISPLAY_PASSWORD_SERIAL, -1);
171            }
172            status.getAsJsonArray("conditions").asList().stream()
173                .map(cond -> (JsonObject) cond)
174                .filter(cond -> Condition.RUNNING
175                    .equals(cond.get("type").getAsString()))
176                .forEach(cond -> cond.addProperty("observedGeneration",
177                    from.getMetadata().getGeneration()));
178            updateUserLoggedIn(from);
179            return status;
180        });
181    }
182
183    /**
184     * On runner state changed.
185     *
186     * @param event the event
187     * @throws ApiException 
188     */
189    @Handler
190    @SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition",
191        "PMD.AssignmentInOperand", "PMD.AvoidDuplicateLiterals" })
192    public void onRunnerStateChanged(RunnerStateChange event)
193            throws ApiException {
194        VmDefinition vmDef;
195        if (vmStub == null || (vmDef = vmStub.model().orElse(null)) == null) {
196            return;
197        }
198        vmStub.updateStatus(from -> {
199            boolean running = event.runState().vmRunning();
200            updateCondition(vmDef, Condition.RUNNING, running, event.reason(),
201                event.message());
202            JsonObject status = updateCondition(vmDef, Condition.BOOTED,
203                event.runState() == RunState.BOOTED, event.reason(),
204                event.message());
205            if (event.runState() == RunState.STARTING) {
206                status.addProperty(Status.RAM, GsonPtr.to(from.data())
207                    .getAsString("spec", "vm", "maximumRam").orElse("0"));
208                status.addProperty(Status.CPUS, 1);
209            } else if (event.runState() == RunState.STOPPED) {
210                status.addProperty(Status.RAM, "0");
211                status.addProperty(Status.CPUS, 0);
212                status.remove(Status.LOGGED_IN_USER);
213            }
214
215            if (!running) {
216                // In case console connection was still present
217                status.addProperty(Status.CONSOLE_CLIENT, "");
218                updateCondition(from, Condition.CONSOLE_CONNECTED, false,
219                    "VmStopped",
220                    "The VM is not running");
221
222                // In case we had an irregular shutdown
223                updateCondition(from, Condition.USER_LOGGED_IN, false,
224                    "VmStopped", "The VM is not running");
225                status.remove(Status.OSINFO);
226                updateCondition(vmDef, "VmopAgentConnected", false, "VmStopped",
227                    "The VM is not running");
228            }
229            return status;
230        }, vmDef);
231
232        // Maybe stop VM
233        if (event.runState() == RunState.TERMINATING && !event.failed()
234            && guestShutdownStops && shutdownByGuest) {
235            logger.info(() -> "Stopping VM because of shutdown by guest.");
236            var res = vmStub.patch(V1Patch.PATCH_FORMAT_JSON_PATCH,
237                new V1Patch("[{\"op\": \"replace\", \"path\": \"/spec/vm/state"
238                    + "\", \"value\": \"Stopped\"}]"),
239                apiClient.defaultPatchOptions());
240            if (!res.isPresent()) {
241                logger.warning(
242                    () -> "Cannot patch pod annotations for: " + vmStub.name());
243            }
244        }
245
246        // Log event
247        var evt = new EventsV1Event()
248            .reportingController(Crd.GROUP + "/" + APP_NAME)
249            .action("StatusUpdate").reason(event.reason())
250            .note(event.message());
251        K8s.createEvent(apiClient, vmDef, evt);
252    }
253
254    private void updateUserLoggedIn(VmDefinition from) {
255        if (loggedInUser == null) {
256            updateCondition(from, Condition.USER_LOGGED_IN, false,
257                Reason.NOT_REQUESTED, "No user to be logged in");
258            return;
259        }
260        if (!from.conditionStatus(Condition.VMOP_AGENT).orElse(false)) {
261            updateCondition(from, Condition.USER_LOGGED_IN, false,
262                "VmopAgentDisconnected", "Waiting for VMOP agent to connect");
263            return;
264        }
265        if (!from.fromStatus(Status.LOGGED_IN_USER).map(loggedInUser::equals)
266            .orElse(false)) {
267            updateCondition(from, Condition.USER_LOGGED_IN, false,
268                "Processing", "Waiting for user to be logged in");
269        }
270        updateCondition(from, Condition.USER_LOGGED_IN, true,
271            Reason.LOGGED_IN, "User is logged in");
272    }
273
274    /**
275     * Update the current RAM size in the status. Balloon changes happen
276     * more than once every second during changes. While this is nice
277     * to watch, this puts a heavy load on the system. Therefore we
278     * only update the status once every 15 seconds or when the target
279     * value is reached.
280     *
281     * @param event the event
282     * @throws ApiException 
283     */
284    @Handler
285    public void onBallonChange(BalloonChangeEvent event) throws ApiException {
286        if (vmStub == null) {
287            return;
288        }
289        Instant now = Instant.now();
290        if (lastRamChange == null
291            || lastRamChange.isBefore(now.minusSeconds(15))
292            || event.size().equals(targetRamValue)) {
293            if (balloonTimer != null) {
294                balloonTimer.cancel();
295                balloonTimer = null;
296            }
297            lastRamChange = now;
298            lastRamValue = event.size();
299            updateRam();
300            return;
301        }
302
303        // Save for later processing and maybe start timer
304        lastRamChange = now;
305        lastRamValue = event.size();
306        if (balloonTimer != null) {
307            return;
308        }
309        final var pipeline = activeEventPipeline();
310        balloonTimer = Components.schedule(t -> {
311            pipeline.submit("Update RAM size", () -> {
312                try {
313                    updateRam();
314                } catch (ApiException e) {
315                    logger.log(Level.WARNING, e,
316                        () -> "Failed to update ram size: " + e.getMessage());
317                }
318                balloonTimer = null;
319            });
320        }, now.plusSeconds(15));
321    }
322
323    private void updateRam() throws ApiException {
324        vmStub.updateStatus(from -> {
325            JsonObject status = from.statusJson();
326            status.addProperty(Status.RAM,
327                new Quantity(new BigDecimal(lastRamValue), Format.BINARY_SI)
328                    .toSuffixedString());
329            return status;
330        });
331    }
332
333    /**
334     * On ballon change.
335     *
336     * @param event the event
337     * @throws ApiException 
338     */
339    @Handler
340    public void onCpuChange(HotpluggableCpuStatus event) throws ApiException {
341        if (vmStub == null) {
342            return;
343        }
344        vmStub.updateStatus(from -> {
345            JsonObject status = from.statusJson();
346            status.addProperty(Status.CPUS, event.usedCpus().size());
347            return status;
348        });
349    }
350
351    /**
352     * On ballon change.
353     *
354     * @param event the event
355     * @throws ApiException 
356     */
357    @Handler
358    public void onDisplayPasswordChanged(DisplayPasswordChanged event)
359            throws ApiException {
360        if (vmStub == null) {
361            return;
362        }
363        vmStub.updateStatus(from -> {
364            JsonObject status = from.statusJson();
365            status.addProperty(Status.DISPLAY_PASSWORD_SERIAL,
366                status.get(Status.DISPLAY_PASSWORD_SERIAL).getAsLong() + 1);
367            return status;
368        });
369    }
370
371    /**
372     * On shutdown.
373     *
374     * @param event the event
375     * @throws ApiException the api exception
376     */
377    @Handler
378    public void onShutdown(ShutdownEvent event) throws ApiException {
379        shutdownByGuest = event.byGuest();
380    }
381
382    /**
383     * On osinfo.
384     *
385     * @param event the event
386     * @throws ApiException 
387     */
388    @Handler
389    public void onOsinfo(OsinfoEvent event) throws ApiException {
390        if (vmStub == null) {
391            return;
392        }
393        var asGson = gson.toJsonTree(
394            objectMapper.convertValue(event.osinfo(), Object.class));
395        vmStub.updateStatus(from -> {
396            JsonObject status = from.statusJson();
397            status.add(Status.OSINFO, asGson);
398            return status;
399        });
400
401    }
402
403    /**
404     * @param event the event
405     * @throws ApiException 
406     */
407    @Handler
408    @SuppressWarnings("PMD.AssignmentInOperand")
409    public void onVmopAgentConnected(VmopAgentConnected event)
410            throws ApiException {
411        VmDefinition vmDef;
412        if (vmStub == null || (vmDef = vmStub.model().orElse(null)) == null) {
413            return;
414        }
415        vmStub.updateStatus(from -> {
416            var status = updateCondition(vmDef, "VmopAgentConnected",
417                true, "VmopAgentStarted", "The VM operator agent is running");
418            updateUserLoggedIn(from);
419            return status;
420        }, vmDef);
421    }
422
423    /**
424     * @param event the event
425     * @throws ApiException 
426     */
427    @Handler
428    @SuppressWarnings("PMD.AssignmentInOperand")
429    public void onVmopAgentLoggedIn(VmopAgentLoggedIn event)
430            throws ApiException {
431        vmStub.updateStatus(from -> {
432            JsonObject status = from.statusJson();
433            status.addProperty(Status.LOGGED_IN_USER,
434                event.triggering().user());
435            updateUserLoggedIn(from);
436            return status;
437        });
438    }
439
440    /**
441     * @param event the event
442     * @throws ApiException 
443     */
444    @Handler
445    @SuppressWarnings("PMD.AssignmentInOperand")
446    public void onVmopAgentLoggedOut(VmopAgentLoggedOut event)
447            throws ApiException {
448        vmStub.updateStatus(from -> {
449            JsonObject status = from.statusJson();
450            status.remove(Status.LOGGED_IN_USER);
451            updateUserLoggedIn(from);
452            return status;
453        });
454    }
455}