001/*
002 * VM-Operator
003 * Copyright (C) 2025 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.manager;
020
021import io.kubernetes.client.openapi.ApiException;
022import io.kubernetes.client.openapi.models.V1Pod;
023import io.kubernetes.client.openapi.models.V1PodList;
024import io.kubernetes.client.util.Watch.Response;
025import io.kubernetes.client.util.generic.options.ListOptions;
026import java.io.IOException;
027import java.time.Duration;
028import java.time.Instant;
029import java.util.Map;
030import java.util.Optional;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.logging.Level;
033import static org.jdrupes.vmoperator.common.Constants.APP_NAME;
034import static org.jdrupes.vmoperator.common.Constants.VM_OP_NAME;
035import org.jdrupes.vmoperator.common.K8sClient;
036import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
037import org.jdrupes.vmoperator.common.K8sV1PodStub;
038import org.jdrupes.vmoperator.manager.events.ChannelDictionary;
039import org.jdrupes.vmoperator.manager.events.PodChanged;
040import org.jdrupes.vmoperator.manager.events.VmChannel;
041import org.jdrupes.vmoperator.manager.events.VmResourceChanged;
042import org.jgrapes.core.Channel;
043import org.jgrapes.core.annotation.Handler;
044
045/**
046 * Watches for changes of pods that run VMs.
047 */
048public class PodMonitor extends AbstractMonitor<V1Pod, V1PodList, VmChannel> {
049
050    private final ChannelDictionary<String, VmChannel, ?> channelDictionary;
051
052    private final Map<String, PendingChange> pendingChanges
053        = new ConcurrentHashMap<>();
054
055    /**
056     * Instantiates a new pod monitor.
057     *
058     * @param componentChannel the component channel
059     * @param channelDictionary the channel dictionary
060     */
061    public PodMonitor(Channel componentChannel,
062            ChannelDictionary<String, VmChannel, ?> channelDictionary) {
063        super(componentChannel, V1Pod.class, V1PodList.class);
064        this.channelDictionary = channelDictionary;
065        context(K8sV1PodStub.CONTEXT);
066        ListOptions options = new ListOptions();
067        options.setLabelSelector("app.kubernetes.io/name=" + APP_NAME + ","
068            + "app.kubernetes.io/component=" + APP_NAME + ","
069            + "app.kubernetes.io/managed-by=" + VM_OP_NAME);
070        options(options);
071    }
072
073    @Override
074    protected void prepareMonitoring() throws IOException, ApiException {
075        client(new K8sClient());
076    }
077
078    @Override
079    protected void handleChange(K8sClient client, Response<V1Pod> change) {
080        String vmName = change.object.getMetadata().getLabels()
081            .get("app.kubernetes.io/instance");
082        if (vmName == null) {
083            return;
084        }
085        var channel = channelDictionary.channel(vmName).orElse(null);
086        var responseType = ResponseType.valueOf(change.type);
087        if (channel != null && channel.vmDefinition() != null) {
088            pendingChanges.remove(vmName);
089            channel.fire(new PodChanged(change.object, responseType));
090            return;
091        }
092
093        // VM definition not available yet, may happen during startup
094        if (responseType == ResponseType.DELETED) {
095            return;
096        }
097        purgePendingChanges();
098        logger.finer(() -> "Add pending pod change for " + vmName);
099        pendingChanges.put(vmName, new PendingChange(Instant.now(), change));
100    }
101
102    private void purgePendingChanges() {
103        Instant tooOld = Instant.now().minus(Duration.ofMinutes(15));
104        for (var itr = pendingChanges.entrySet().iterator(); itr.hasNext();) {
105            var change = itr.next();
106            if (change.getValue().from().isBefore(tooOld)) {
107                itr.remove();
108                logger.finer(
109                    () -> "Cleaned pending pod change for " + change.getKey());
110            }
111        }
112    }
113
114    /**
115     * Check for pending changes.
116     *
117     * @param event the event
118     * @param channel the channel
119     */
120    @Handler
121    public void onVmResourceChanged(VmResourceChanged event,
122            VmChannel channel) {
123        Optional.ofNullable(pendingChanges.remove(event.vmDefinition().name()))
124            .map(PendingChange::change).ifPresent(change -> {
125                logger.finer(() -> "Firing pending pod change for "
126                    + event.vmDefinition().name());
127                channel.fire(new PodChanged(change.object,
128                    ResponseType.valueOf(change.type)));
129                if (logger.isLoggable(Level.FINER)
130                    && pendingChanges.isEmpty()) {
131                    logger.finer("No pending pod changes left.");
132                }
133            });
134    }
135
136    private record PendingChange(Instant from, Response<V1Pod> change) {
137    }
138
139}