001/*
002 * VM-Operator
003 * Copyright (C) 2023,2025 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.manager;
020
021import com.google.gson.JsonObject;
022import io.kubernetes.client.apimachinery.GroupVersionKind;
023import io.kubernetes.client.custom.V1Patch;
024import io.kubernetes.client.openapi.ApiException;
025import io.kubernetes.client.util.Watch;
026import io.kubernetes.client.util.generic.options.ListOptions;
027import java.io.IOException;
028import java.net.HttpURLConnection;
029import java.time.Instant;
030import java.util.ArrayList;
031import java.util.Collections;
032import java.util.Optional;
033import java.util.Set;
034import java.util.stream.Collectors;
035import org.jdrupes.vmoperator.common.Constants.Crd;
036import org.jdrupes.vmoperator.common.Constants.Status;
037import org.jdrupes.vmoperator.common.K8s;
038import org.jdrupes.vmoperator.common.K8sClient;
039import org.jdrupes.vmoperator.common.K8sDynamicStub;
040import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
041import org.jdrupes.vmoperator.common.K8sV1ConfigMapStub;
042import org.jdrupes.vmoperator.common.K8sV1StatefulSetStub;
043import org.jdrupes.vmoperator.common.VmDefinition;
044import org.jdrupes.vmoperator.common.VmDefinitionStub;
045import org.jdrupes.vmoperator.common.VmDefinitions;
046import org.jdrupes.vmoperator.common.VmExtraData;
047import static org.jdrupes.vmoperator.manager.Constants.APP_NAME;
048import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME;
049import org.jdrupes.vmoperator.manager.events.ChannelManager;
050import org.jdrupes.vmoperator.manager.events.ModifyVm;
051import org.jdrupes.vmoperator.manager.events.PodChanged;
052import org.jdrupes.vmoperator.manager.events.UpdateAssignment;
053import org.jdrupes.vmoperator.manager.events.VmChannel;
054import org.jdrupes.vmoperator.manager.events.VmResourceChanged;
055import org.jdrupes.vmoperator.util.GsonPtr;
056import org.jgrapes.core.Channel;
057import org.jgrapes.core.Event;
058import org.jgrapes.core.EventPipeline;
059import org.jgrapes.core.annotation.Handler;
060
061/**
062 * Watches for changes of VM definitions. When a VM definition (CR)
063 * becomes known, is is registered with a {@link ChannelManager} and thus
064 * gets an associated {@link VmChannel} and an associated
065 * {@link EventPipeline}.
066 * 
067 * The {@link EventPipeline} is used for submitting an action that processes
068 * the change data from kubernetes, eventually transforming it to a
069 * {@link VmResourceChanged} event that is handled by another
070 * {@link EventPipeline} associated with the {@link VmChannel}. This
071 * event pipeline should be used for all events related to changes of
072 * a particular VM.
073 */
074@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" })
075public class VmMonitor extends
076        AbstractMonitor<VmDefinition, VmDefinitions, VmChannel> {
077
078    private final ChannelManager<String, VmChannel,
079            EventPipeline> channelManager;
080
081    /**
082     * Instantiates a new VM definition watcher.
083     *
084     * @param componentChannel the component channel
085     * @param channelManager the channel manager
086     */
087    public VmMonitor(Channel componentChannel,
088            ChannelManager<String, VmChannel, EventPipeline> channelManager) {
089        super(componentChannel, VmDefinition.class,
090            VmDefinitions.class);
091        this.channelManager = channelManager;
092    }
093
094    @Override
095    protected void prepareMonitoring() throws IOException, ApiException {
096        client(new K8sClient());
097
098        // Get all our API versions
099        var ctx = K8s.context(client(), Crd.GROUP, "", Crd.KIND_VM);
100        if (ctx.isEmpty()) {
101            logger.severe(() -> "Cannot get CRD context.");
102            return;
103        }
104        context(ctx.get());
105
106        // Remove left over resources
107        purge();
108    }
109
110    @SuppressWarnings("PMD.CognitiveComplexity")
111    private void purge() throws ApiException {
112        // Get existing CRs (VMs)
113        var known = K8sDynamicStub.list(client(), context(), namespace())
114            .stream().map(stub -> stub.name()).collect(Collectors.toSet());
115        ListOptions opts = new ListOptions();
116        opts.setLabelSelector(
117            "app.kubernetes.io/managed-by=" + VM_OP_NAME + ","
118                + "app.kubernetes.io/name=" + APP_NAME);
119        for (var context : Set.of(K8sV1StatefulSetStub.CONTEXT,
120            K8sV1ConfigMapStub.CONTEXT)) {
121            for (var resStub : K8sDynamicStub.list(client(), context,
122                namespace(), opts)) {
123                String instance = resStub.model()
124                    .map(m -> m.metadata().getName()).orElse("(unknown)");
125                if (!known.contains(instance)) {
126                    resStub.delete();
127                }
128            }
129        }
130    }
131
132    @Override
133    protected void handleChange(K8sClient client,
134            Watch.Response<VmDefinition> response) {
135        var name = response.object.getMetadata().getName();
136
137        // Process the response data on a VM specific pipeline to
138        // increase concurrency when e.g. starting many VMs.
139        var preparing = channelManager.associated(name)
140            .orElseGet(() -> newEventPipeline());
141        preparing.submit("VmChange[" + name + "]",
142            () -> processChange(client, response, preparing));
143    }
144
145    private void processChange(K8sClient client,
146            Watch.Response<VmDefinition> response, EventPipeline preparing) {
147        // Get full definition and associate with channel as backup
148        var vmDef = response.object;
149        if (vmDef.data() == null) {
150            // ADDED event does not provide data, see
151            // https://github.com/kubernetes-client/java/issues/3215
152            vmDef = getModel(client, vmDef);
153        }
154        var name = response.object.getMetadata().getName();
155        var channel = channelManager.channel(name)
156            .orElseGet(() -> channelManager.createChannel(name));
157        if (vmDef.data() != null) {
158            // New data, augment and save
159            addExtraData(vmDef, channel.vmDefinition());
160            channel.setVmDefinition(vmDef);
161        } else {
162            // Reuse cached (e.g. if deleted)
163            vmDef = channel.vmDefinition();
164        }
165        if (vmDef == null) {
166            logger.warning(() -> "Cannot get defintion for "
167                + response.object.getMetadata());
168            return;
169        }
170        channelManager.put(name, channel, preparing);
171
172        // Create and fire changed event. Remove channel from channel
173        // manager on completion.
174        VmResourceChanged chgEvt
175            = new VmResourceChanged(ResponseType.valueOf(response.type), vmDef,
176                channel.setGeneration(response.object.getMetadata()
177                    .getGeneration()),
178                false);
179        if (ResponseType.valueOf(response.type) == ResponseType.DELETED) {
180            chgEvt = Event.onCompletion(chgEvt,
181                e -> channelManager.remove(e.vmDefinition().name()));
182        }
183        channel.fire(chgEvt);
184    }
185
186    private VmDefinition getModel(K8sClient client, VmDefinition vmDef) {
187        try {
188            return VmDefinitionStub.get(client, context(), namespace(),
189                vmDef.metadata().getName()).model().orElse(null);
190        } catch (ApiException e) {
191            return null;
192        }
193    }
194
195    @SuppressWarnings("PMD.AvoidDuplicateLiterals")
196    private void addExtraData(VmDefinition vmDef, VmDefinition prevState) {
197        var extra = new VmExtraData(vmDef);
198        var prevExtra = Optional.ofNullable(prevState).map(VmDefinition::extra);
199
200        // Maintain (or initialize) the resetCount
201        extra.resetCount(prevExtra.map(VmExtraData::resetCount).orElse(0L));
202
203        // Maintain node info
204        prevExtra
205            .ifPresent(e -> extra.nodeInfo(e.nodeName(), e.nodeAddresses()));
206    }
207
208    /**
209     * On pod changed.
210     *
211     * @param event the event
212     * @param channel the channel
213     */
214    @Handler
215    public void onPodChanged(PodChanged event, VmChannel channel) {
216        var vmDef = channel.vmDefinition();
217
218        // Make sure that this is properly sync'd with VM CR changes.
219        channelManager.associated(vmDef.name())
220            .orElseGet(() -> activeEventPipeline())
221            .submit("NodeInfo[" + vmDef.name() + "]",
222                () -> {
223                    updateNodeInfo(event, vmDef);
224                    channel.fire(new VmResourceChanged(ResponseType.MODIFIED,
225                        vmDef, false, true));
226                });
227    }
228
229    private void updateNodeInfo(PodChanged event, VmDefinition vmDef) {
230        var extra = vmDef.extra();
231        if (event.type() == ResponseType.DELETED) {
232            // The status of a deleted pod is the status before deletion,
233            // i.e. the node info is still cached and must be removed.
234            extra.nodeInfo("", Collections.emptyList());
235            return;
236        }
237
238        // Get current node info from pod
239        var pod = event.pod();
240        var nodeName = Optional
241            .ofNullable(pod.getSpec().getNodeName()).orElse("");
242        logger.finer(() -> "Adding node name " + nodeName
243            + " to VM info for " + vmDef.name());
244        @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
245        var addrs = new ArrayList<String>();
246        Optional.ofNullable(pod.getStatus().getPodIPs())
247            .orElse(Collections.emptyList()).stream()
248            .map(ip -> ip.getIp()).forEach(addrs::add);
249        logger.finer(() -> "Adding node addresses " + addrs
250            + " to VM info for " + vmDef.name());
251        extra.nodeInfo(nodeName, addrs);
252    }
253
254    /**
255     * On modify vm.
256     *
257     * @param event the event
258     * @throws ApiException the api exception
259     * @throws IOException Signals that an I/O exception has occurred.
260     */
261    @Handler
262    public void onModifyVm(ModifyVm event, VmChannel channel)
263            throws ApiException, IOException {
264        patchVmDef(channel.client(), event.name(), "spec/vm/" + event.path(),
265            event.value());
266    }
267
268    private void patchVmDef(K8sClient client, String name, String path,
269            Object value) throws ApiException, IOException {
270        var vmStub = K8sDynamicStub.get(client,
271            new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM), namespace(),
272            name);
273
274        // Patch running
275        String valueAsText = value instanceof String
276            ? "\"" + value + "\""
277            : value.toString();
278        var res = vmStub.patch(V1Patch.PATCH_FORMAT_JSON_PATCH,
279            new V1Patch("[{\"op\": \"replace\", \"path\": \"/"
280                + path + "\", \"value\": " + valueAsText + "}]"),
281            client.defaultPatchOptions());
282        if (!res.isPresent()) {
283            logger.warning(
284                () -> "Cannot patch definition for Vm " + vmStub.name());
285        }
286    }
287
288    /**
289     * Attempt to Update the assignment information in the status of the
290     * VM CR. Returns true if successful. The handler does not attempt
291     * retries, because in case of failure it will be necessary to
292     * re-evaluate the chosen VM.
293     *
294     * @param event the event
295     * @param channel the channel
296     * @throws ApiException the api exception
297     */
298    @Handler
299    public void onUpdatedAssignment(UpdateAssignment event, VmChannel channel)
300            throws ApiException {
301        try {
302            var vmDef = channel.vmDefinition();
303            var vmStub = VmDefinitionStub.get(channel.client(),
304                new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM),
305                vmDef.namespace(), vmDef.name());
306            if (vmStub.updateStatus(vmDef, from -> {
307                JsonObject status = from.statusJson();
308                if (event.toUser() == null) {
309                    ((JsonObject) GsonPtr.to(status).get())
310                        .remove(Status.ASSIGNMENT);
311                } else {
312                    var assignment = GsonPtr.to(status).to(Status.ASSIGNMENT);
313                    assignment.set("pool", event.fromPool().name());
314                    assignment.set("user", event.toUser());
315                    assignment.set("lastUsed", Instant.now().toString());
316                }
317                return status;
318            }).isPresent()) {
319                event.setResult(true);
320            }
321        } catch (ApiException e) {
322            // Log exceptions except for conflict, which can be expected
323            if (HttpURLConnection.HTTP_CONFLICT != e.getCode()) {
324                throw e;
325            }
326        }
327        event.setResult(false);
328    }
329
330}