001/*
002 * VM-Operator
003 * Copyright (C) 2023,2025 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.manager;
020
021import io.kubernetes.client.openapi.ApiException;
022import io.kubernetes.client.openapi.models.V1ObjectMeta;
023import io.kubernetes.client.util.Watch;
024import io.kubernetes.client.util.generic.options.ListOptions;
025import java.io.IOException;
026import java.time.Instant;
027import java.util.ArrayList;
028import java.util.Comparator;
029import java.util.Optional;
030import java.util.Set;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.logging.Level;
033import java.util.stream.Collectors;
034import org.jdrupes.vmoperator.common.Constants.Crd;
035import org.jdrupes.vmoperator.common.K8s;
036import org.jdrupes.vmoperator.common.K8sClient;
037import org.jdrupes.vmoperator.common.K8sDynamicStub;
038import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
039import org.jdrupes.vmoperator.common.K8sV1ConfigMapStub;
040import org.jdrupes.vmoperator.common.K8sV1PodStub;
041import org.jdrupes.vmoperator.common.K8sV1StatefulSetStub;
042import org.jdrupes.vmoperator.common.VmDefinition;
043import org.jdrupes.vmoperator.common.VmDefinitionStub;
044import org.jdrupes.vmoperator.common.VmDefinitions;
045import org.jdrupes.vmoperator.common.VmExtraData;
046import org.jdrupes.vmoperator.common.VmPool;
047import static org.jdrupes.vmoperator.manager.Constants.APP_NAME;
048import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME;
049import org.jdrupes.vmoperator.manager.events.AssignVm;
050import org.jdrupes.vmoperator.manager.events.ChannelManager;
051import org.jdrupes.vmoperator.manager.events.GetPools;
052import org.jdrupes.vmoperator.manager.events.GetVms;
053import org.jdrupes.vmoperator.manager.events.GetVms.VmData;
054import org.jdrupes.vmoperator.manager.events.ModifyVm;
055import org.jdrupes.vmoperator.manager.events.UpdateAssignment;
056import org.jdrupes.vmoperator.manager.events.VmChannel;
057import org.jdrupes.vmoperator.manager.events.VmDefChanged;
058import org.jgrapes.core.Channel;
059import org.jgrapes.core.Event;
060import org.jgrapes.core.annotation.Handler;
061
062/**
063 * Watches for changes of VM definitions.
064 */
065@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" })
066public class VmMonitor extends
067        AbstractMonitor<VmDefinition, VmDefinitions, VmChannel> {
068
069    private final ChannelManager<String, VmChannel, ?> channelManager;
070
071    /**
072     * Instantiates a new VM definition watcher.
073     *
074     * @param componentChannel the component channel
075     * @param channelManager the channel manager
076     */
077    public VmMonitor(Channel componentChannel,
078            ChannelManager<String, VmChannel, ?> channelManager) {
079        super(componentChannel, VmDefinition.class,
080            VmDefinitions.class);
081        this.channelManager = channelManager;
082    }
083
084    @Override
085    protected void prepareMonitoring() throws IOException, ApiException {
086        client(new K8sClient());
087
088        // Get all our API versions
089        var ctx = K8s.context(client(), Crd.GROUP, "", Crd.KIND_VM);
090        if (ctx.isEmpty()) {
091            logger.severe(() -> "Cannot get CRD context.");
092            return;
093        }
094        context(ctx.get());
095
096        // Remove left over resources
097        purge();
098    }
099
100    @SuppressWarnings("PMD.CognitiveComplexity")
101    private void purge() throws ApiException {
102        // Get existing CRs (VMs)
103        var known = K8sDynamicStub.list(client(), context(), namespace())
104            .stream().map(stub -> stub.name()).collect(Collectors.toSet());
105        ListOptions opts = new ListOptions();
106        opts.setLabelSelector(
107            "app.kubernetes.io/managed-by=" + VM_OP_NAME + ","
108                + "app.kubernetes.io/name=" + APP_NAME);
109        for (var context : Set.of(K8sV1StatefulSetStub.CONTEXT,
110            K8sV1ConfigMapStub.CONTEXT)) {
111            for (var resStub : K8sDynamicStub.list(client(), context,
112                namespace(), opts)) {
113                String instance = resStub.model()
114                    .map(m -> m.metadata().getName()).orElse("(unknown)");
115                if (!known.contains(instance)) {
116                    resStub.delete();
117                }
118            }
119        }
120    }
121
122    @Override
123    protected void handleChange(K8sClient client,
124            Watch.Response<VmDefinition> response) {
125        V1ObjectMeta metadata = response.object.getMetadata();
126        AtomicBoolean toBeAdded = new AtomicBoolean(false);
127        VmChannel channel = channelManager.channel(metadata.getName())
128            .orElseGet(() -> {
129                toBeAdded.set(true);
130                return channelManager.createChannel(metadata.getName());
131            });
132
133        // Get full definition and associate with channel as backup
134        var vmDef = response.object;
135        if (vmDef.data() == null) {
136            // ADDED event does not provide data, see
137            // https://github.com/kubernetes-client/java/issues/3215
138            vmDef = getModel(client, vmDef);
139        }
140        if (vmDef.data() != null) {
141            // New data, augment and save
142            addExtraData(channel.client(), vmDef, channel.vmDefinition());
143            channel.setVmDefinition(vmDef);
144        } else {
145            // Reuse cached (e.g. if deleted)
146            vmDef = channel.vmDefinition();
147        }
148        if (vmDef == null) {
149            logger.warning(() -> "Cannot get defintion for "
150                + response.object.getMetadata());
151            return;
152        }
153        if (toBeAdded.get()) {
154            channelManager.put(vmDef.name(), channel);
155        }
156
157        // Create and fire changed event. Remove channel from channel
158        // manager on completion.
159        VmDefChanged chgEvt
160            = new VmDefChanged(ResponseType.valueOf(response.type),
161                channel.setGeneration(response.object.getMetadata()
162                    .getGeneration()),
163                vmDef);
164        if (ResponseType.valueOf(response.type) == ResponseType.DELETED) {
165            chgEvt = Event.onCompletion(chgEvt,
166                e -> channelManager.remove(e.vmDefinition().name()));
167        }
168        channel.pipeline().fire(chgEvt, channel);
169    }
170
171    private VmDefinition getModel(K8sClient client, VmDefinition vmDef) {
172        try {
173            return VmDefinitionStub.get(client, context(), namespace(),
174                vmDef.metadata().getName()).model().orElse(null);
175        } catch (ApiException e) {
176            return null;
177        }
178    }
179
180    @SuppressWarnings("PMD.AvoidDuplicateLiterals")
181    private void addExtraData(K8sClient client, VmDefinition vmDef,
182            VmDefinition prevState) {
183        var extra = new VmExtraData(vmDef);
184
185        // Maintain (or initialize) the resetCount
186        extra.resetCount(
187            Optional.ofNullable(prevState).flatMap(VmDefinition::extra)
188                .map(VmExtraData::resetCount).orElse(0L));
189
190        // VM definition status changes before the pod terminates.
191        // This results in pod information being shown for a stopped
192        // VM which is irritating. So check condition first.
193        if (!vmDef.conditionStatus("Running").orElse(false)) {
194            return;
195        }
196
197        // Get pod and extract node information.
198        var podSearch = new ListOptions();
199        podSearch.setLabelSelector("app.kubernetes.io/name=" + APP_NAME
200            + ",app.kubernetes.io/component=" + APP_NAME
201            + ",app.kubernetes.io/instance=" + vmDef.name());
202        try {
203            var podList
204                = K8sV1PodStub.list(client, namespace(), podSearch);
205            for (var podStub : podList) {
206                var nodeName = podStub.model().get().getSpec().getNodeName();
207                logger.fine(() -> "Adding node name " + nodeName
208                    + " to VM info for " + vmDef.name());
209                @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
210                var addrs = new ArrayList<String>();
211                podStub.model().get().getStatus().getPodIPs().stream()
212                    .map(ip -> ip.getIp()).forEach(addrs::add);
213                logger.fine(() -> "Adding node addresses " + addrs
214                    + " to VM info for " + vmDef.name());
215                extra.nodeInfo(nodeName, addrs);
216            }
217        } catch (ApiException e) {
218            logger.log(Level.WARNING, e,
219                () -> "Cannot access node information: " + e.getMessage());
220        }
221    }
222
223    /**
224     * Returns the VM data.
225     *
226     * @param event the event
227     */
228    @Handler
229    public void onGetVms(GetVms event) {
230        event.setResult(channelManager.channels().stream()
231            .filter(c -> event.name().isEmpty()
232                || c.vmDefinition().name().equals(event.name().get()))
233            .filter(c -> event.user().isEmpty() && event.roles().isEmpty()
234                || !c.vmDefinition().permissionsFor(event.user().orElse(null),
235                    event.roles()).isEmpty())
236            .filter(c -> event.fromPool().isEmpty()
237                || c.vmDefinition().assignedFrom()
238                    .map(p -> p.equals(event.fromPool().get())).orElse(false))
239            .filter(c -> event.toUser().isEmpty()
240                || c.vmDefinition().assignedTo()
241                    .map(u -> u.equals(event.toUser().get())).orElse(false))
242            .map(c -> new VmData(c.vmDefinition(), c))
243            .toList());
244    }
245
246    /**
247     * Assign a VM if not already assigned.
248     *
249     * @param event the event
250     * @throws ApiException the api exception
251     * @throws InterruptedException 
252     */
253    @Handler
254    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
255    public void onAssignVm(AssignVm event)
256            throws ApiException, InterruptedException {
257        while (true) {
258            // Search for existing assignment.
259            var vmQuery = channelManager.channels().stream()
260                .filter(c -> c.vmDefinition().assignedFrom()
261                    .map(p -> p.equals(event.fromPool())).orElse(false))
262                .filter(c -> c.vmDefinition().assignedTo()
263                    .map(u -> u.equals(event.toUser())).orElse(false))
264                .findFirst();
265            if (vmQuery.isPresent()) {
266                var vmDef = vmQuery.get().vmDefinition();
267                event.setResult(new VmData(vmDef, vmQuery.get()));
268                return;
269            }
270
271            // Get the pool definition for checking possible assignment
272            VmPool vmPool = newEventPipeline().fire(new GetPools()
273                .withName(event.fromPool())).get().stream().findFirst()
274                .orElse(null);
275            if (vmPool == null) {
276                return;
277            }
278
279            // Find available VM.
280            vmQuery = channelManager.channels().stream()
281                .filter(c -> vmPool.isAssignable(c.vmDefinition()))
282                .sorted(Comparator.comparing((VmChannel c) -> c.vmDefinition()
283                    .assignmentLastUsed().orElse(Instant.ofEpochSecond(0)))
284                    .thenComparing(preferRunning))
285                .findFirst();
286
287            // None found
288            if (vmQuery.isEmpty()) {
289                return;
290            }
291
292            // Assign to user
293            var chosenVm = vmQuery.get();
294            var vmPipeline = chosenVm.pipeline();
295            if (Optional.ofNullable(vmPipeline.fire(new UpdateAssignment(
296                vmPool.name(), event.toUser()), chosenVm).get())
297                .orElse(false)) {
298                var vmDef = chosenVm.vmDefinition();
299                event.setResult(new VmData(vmDef, chosenVm));
300
301                // Make sure that a newly assigned VM is running.
302                chosenVm.pipeline().fire(new ModifyVm(vmDef.name(),
303                    "state", "Running", chosenVm));
304                return;
305            }
306        }
307    }
308
309    private static Comparator<VmChannel> preferRunning
310        = new Comparator<>() {
311            @Override
312            public int compare(VmChannel ch1, VmChannel ch2) {
313                if (ch1.vmDefinition().conditionStatus("Running").orElse(false)
314                    && !ch2.vmDefinition().conditionStatus("Running")
315                        .orElse(false)) {
316                    return -1;
317                }
318                return 0;
319            }
320        };
321
322}