001/*
002 * VM-Operator
003 * Copyright (C) 2023,2025 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.manager;
020
021import com.google.gson.JsonObject;
022import io.kubernetes.client.apimachinery.GroupVersionKind;
023import io.kubernetes.client.custom.V1Patch;
024import io.kubernetes.client.openapi.ApiException;
025import io.kubernetes.client.openapi.models.V1ObjectMeta;
026import io.kubernetes.client.util.Watch;
027import io.kubernetes.client.util.generic.options.ListOptions;
028import java.io.IOException;
029import java.net.HttpURLConnection;
030import java.time.Instant;
031import java.util.ArrayList;
032import java.util.Collections;
033import java.util.Optional;
034import java.util.Set;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.stream.Collectors;
037import org.jdrupes.vmoperator.common.Constants.Crd;
038import org.jdrupes.vmoperator.common.Constants.Status;
039import org.jdrupes.vmoperator.common.K8s;
040import org.jdrupes.vmoperator.common.K8sClient;
041import org.jdrupes.vmoperator.common.K8sDynamicStub;
042import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
043import org.jdrupes.vmoperator.common.K8sV1ConfigMapStub;
044import org.jdrupes.vmoperator.common.K8sV1StatefulSetStub;
045import org.jdrupes.vmoperator.common.VmDefinition;
046import org.jdrupes.vmoperator.common.VmDefinitionStub;
047import org.jdrupes.vmoperator.common.VmDefinitions;
048import org.jdrupes.vmoperator.common.VmExtraData;
049import static org.jdrupes.vmoperator.manager.Constants.APP_NAME;
050import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME;
051import org.jdrupes.vmoperator.manager.events.ChannelManager;
052import org.jdrupes.vmoperator.manager.events.ModifyVm;
053import org.jdrupes.vmoperator.manager.events.PodChanged;
054import org.jdrupes.vmoperator.manager.events.UpdateAssignment;
055import org.jdrupes.vmoperator.manager.events.VmChannel;
056import org.jdrupes.vmoperator.manager.events.VmResourceChanged;
057import org.jdrupes.vmoperator.util.GsonPtr;
058import org.jgrapes.core.Channel;
059import org.jgrapes.core.Event;
060import org.jgrapes.core.annotation.Handler;
061
062/**
063 * Watches for changes of VM definitions.
064 */
065@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" })
066public class VmMonitor extends
067        AbstractMonitor<VmDefinition, VmDefinitions, VmChannel> {
068
069    private final ChannelManager<String, VmChannel, ?> channelManager;
070
071    /**
072     * Instantiates a new VM definition watcher.
073     *
074     * @param componentChannel the component channel
075     * @param channelManager the channel manager
076     */
077    public VmMonitor(Channel componentChannel,
078            ChannelManager<String, VmChannel, ?> channelManager) {
079        super(componentChannel, VmDefinition.class,
080            VmDefinitions.class);
081        this.channelManager = channelManager;
082    }
083
084    @Override
085    protected void prepareMonitoring() throws IOException, ApiException {
086        client(new K8sClient());
087
088        // Get all our API versions
089        var ctx = K8s.context(client(), Crd.GROUP, "", Crd.KIND_VM);
090        if (ctx.isEmpty()) {
091            logger.severe(() -> "Cannot get CRD context.");
092            return;
093        }
094        context(ctx.get());
095
096        // Remove left over resources
097        purge();
098    }
099
100    @SuppressWarnings("PMD.CognitiveComplexity")
101    private void purge() throws ApiException {
102        // Get existing CRs (VMs)
103        var known = K8sDynamicStub.list(client(), context(), namespace())
104            .stream().map(stub -> stub.name()).collect(Collectors.toSet());
105        ListOptions opts = new ListOptions();
106        opts.setLabelSelector(
107            "app.kubernetes.io/managed-by=" + VM_OP_NAME + ","
108                + "app.kubernetes.io/name=" + APP_NAME);
109        for (var context : Set.of(K8sV1StatefulSetStub.CONTEXT,
110            K8sV1ConfigMapStub.CONTEXT)) {
111            for (var resStub : K8sDynamicStub.list(client(), context,
112                namespace(), opts)) {
113                String instance = resStub.model()
114                    .map(m -> m.metadata().getName()).orElse("(unknown)");
115                if (!known.contains(instance)) {
116                    resStub.delete();
117                }
118            }
119        }
120    }
121
122    @Override
123    protected void handleChange(K8sClient client,
124            Watch.Response<VmDefinition> response) {
125        V1ObjectMeta metadata = response.object.getMetadata();
126        AtomicBoolean toBeAdded = new AtomicBoolean(false);
127        VmChannel channel = channelManager.channel(metadata.getName())
128            .orElseGet(() -> {
129                toBeAdded.set(true);
130                return channelManager.createChannel(metadata.getName());
131            });
132
133        // Get full definition and associate with channel as backup
134        var vmDef = response.object;
135        if (vmDef.data() == null) {
136            // ADDED event does not provide data, see
137            // https://github.com/kubernetes-client/java/issues/3215
138            vmDef = getModel(client, vmDef);
139        }
140        if (vmDef.data() != null) {
141            // New data, augment and save
142            addExtraData(vmDef, channel.vmDefinition());
143            channel.setVmDefinition(vmDef);
144        } else {
145            // Reuse cached (e.g. if deleted)
146            vmDef = channel.vmDefinition();
147        }
148        if (vmDef == null) {
149            logger.warning(() -> "Cannot get defintion for "
150                + response.object.getMetadata());
151            return;
152        }
153        if (toBeAdded.get()) {
154            channelManager.put(vmDef.name(), channel);
155        }
156
157        // Create and fire changed event. Remove channel from channel
158        // manager on completion.
159        VmResourceChanged chgEvt
160            = new VmResourceChanged(ResponseType.valueOf(response.type), vmDef,
161                channel.setGeneration(response.object.getMetadata()
162                    .getGeneration()),
163                false);
164        if (ResponseType.valueOf(response.type) == ResponseType.DELETED) {
165            chgEvt = Event.onCompletion(chgEvt,
166                e -> channelManager.remove(e.vmDefinition().name()));
167        }
168        channel.fire(chgEvt);
169    }
170
171    private VmDefinition getModel(K8sClient client, VmDefinition vmDef) {
172        try {
173            return VmDefinitionStub.get(client, context(), namespace(),
174                vmDef.metadata().getName()).model().orElse(null);
175        } catch (ApiException e) {
176            return null;
177        }
178    }
179
180    @SuppressWarnings("PMD.AvoidDuplicateLiterals")
181    private void addExtraData(VmDefinition vmDef, VmDefinition prevState) {
182        var extra = new VmExtraData(vmDef);
183        var prevExtra = Optional.ofNullable(prevState).map(VmDefinition::extra);
184
185        // Maintain (or initialize) the resetCount
186        extra.resetCount(prevExtra.map(VmExtraData::resetCount).orElse(0L));
187
188        // Maintain node info
189        prevExtra
190            .ifPresent(e -> extra.nodeInfo(e.nodeName(), e.nodeAddresses()));
191    }
192
193    /**
194     * On pod changed.
195     *
196     * @param event the event
197     * @param channel the channel
198     */
199    @Handler
200    public void onPodChanged(PodChanged event, VmChannel channel) {
201        var vmDef = channel.vmDefinition();
202        updateNodeInfo(event, vmDef);
203        channel
204            .fire(new VmResourceChanged(ResponseType.MODIFIED, vmDef, false, true));
205    }
206
207    private void updateNodeInfo(PodChanged event, VmDefinition vmDef) {
208        var extra = vmDef.extra();
209        if (event.type() == ResponseType.DELETED) {
210            // The status of a deleted pod is the status before deletion,
211            // i.e. the node info is still cached and must be removed.
212            extra.nodeInfo("", Collections.emptyList());
213            return;
214        }
215
216        // Get current node info from pod
217        var pod = event.pod();
218        var nodeName = Optional
219            .ofNullable(pod.getSpec().getNodeName()).orElse("");
220        logger.finer(() -> "Adding node name " + nodeName
221            + " to VM info for " + vmDef.name());
222        @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
223        var addrs = new ArrayList<String>();
224        Optional.ofNullable(pod.getStatus().getPodIPs())
225            .orElse(Collections.emptyList()).stream()
226            .map(ip -> ip.getIp()).forEach(addrs::add);
227        logger.finer(() -> "Adding node addresses " + addrs
228            + " to VM info for " + vmDef.name());
229        extra.nodeInfo(nodeName, addrs);
230    }
231
232    /**
233     * On modify vm.
234     *
235     * @param event the event
236     * @throws ApiException the api exception
237     * @throws IOException Signals that an I/O exception has occurred.
238     */
239    @Handler
240    public void onModifyVm(ModifyVm event, VmChannel channel)
241            throws ApiException, IOException {
242        patchVmDef(channel.client(), event.name(), "spec/vm/" + event.path(),
243            event.value());
244    }
245
246    private void patchVmDef(K8sClient client, String name, String path,
247            Object value) throws ApiException, IOException {
248        var vmStub = K8sDynamicStub.get(client,
249            new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM), namespace(),
250            name);
251
252        // Patch running
253        String valueAsText = value instanceof String
254            ? "\"" + value + "\""
255            : value.toString();
256        var res = vmStub.patch(V1Patch.PATCH_FORMAT_JSON_PATCH,
257            new V1Patch("[{\"op\": \"replace\", \"path\": \"/"
258                + path + "\", \"value\": " + valueAsText + "}]"),
259            client.defaultPatchOptions());
260        if (!res.isPresent()) {
261            logger.warning(
262                () -> "Cannot patch definition for Vm " + vmStub.name());
263        }
264    }
265
266    /**
267     * Attempt to Update the assignment information in the status of the
268     * VM CR. Returns true if successful. The handler does not attempt
269     * retries, because in case of failure it will be necessary to
270     * re-evaluate the chosen VM.
271     *
272     * @param event the event
273     * @param channel the channel
274     * @throws ApiException the api exception
275     */
276    @Handler
277    public void onUpdatedAssignment(UpdateAssignment event, VmChannel channel)
278            throws ApiException {
279        try {
280            var vmDef = channel.vmDefinition();
281            var vmStub = VmDefinitionStub.get(channel.client(),
282                new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM),
283                vmDef.namespace(), vmDef.name());
284            if (vmStub.updateStatus(vmDef, from -> {
285                JsonObject status = from.statusJson();
286                if (event.toUser() == null) {
287                    ((JsonObject) GsonPtr.to(status).get())
288                        .remove(Status.ASSIGNMENT);
289                } else {
290                    var assignment = GsonPtr.to(status).to(Status.ASSIGNMENT);
291                    assignment.set("pool", event.fromPool().name());
292                    assignment.set("user", event.toUser());
293                    assignment.set("lastUsed", Instant.now().toString());
294                }
295                return status;
296            }).isPresent()) {
297                event.setResult(true);
298            }
299        } catch (ApiException e) {
300            // Log exceptions except for conflict, which can be expected
301            if (HttpURLConnection.HTTP_CONFLICT != e.getCode()) {
302                throw e;
303            }
304        }
305        event.setResult(false);
306    }
307
308}