001/*
002 * VM-Operator
003 * Copyright (C) 2023,2025 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.manager;
020
021import com.google.gson.JsonObject;
022import io.kubernetes.client.apimachinery.GroupVersionKind;
023import io.kubernetes.client.custom.V1Patch;
024import io.kubernetes.client.openapi.ApiException;
025import io.kubernetes.client.util.Watch;
026import io.kubernetes.client.util.generic.options.ListOptions;
027import java.io.IOException;
028import java.net.HttpURLConnection;
029import java.time.Instant;
030import java.util.ArrayList;
031import java.util.Collections;
032import java.util.Optional;
033import java.util.Set;
034import java.util.stream.Collectors;
035import org.jdrupes.vmoperator.common.Constants.Crd;
036import org.jdrupes.vmoperator.common.Constants.Status;
037import org.jdrupes.vmoperator.common.K8s;
038import org.jdrupes.vmoperator.common.K8sClient;
039import org.jdrupes.vmoperator.common.K8sDynamicStub;
040import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
041import org.jdrupes.vmoperator.common.K8sV1ConfigMapStub;
042import org.jdrupes.vmoperator.common.K8sV1StatefulSetStub;
043import org.jdrupes.vmoperator.common.VmDefinition;
044import org.jdrupes.vmoperator.common.VmDefinitionStub;
045import org.jdrupes.vmoperator.common.VmDefinitions;
046import org.jdrupes.vmoperator.common.VmExtraData;
047import static org.jdrupes.vmoperator.manager.Constants.APP_NAME;
048import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME;
049import org.jdrupes.vmoperator.manager.events.ChannelManager;
050import org.jdrupes.vmoperator.manager.events.ModifyVm;
051import org.jdrupes.vmoperator.manager.events.PodChanged;
052import org.jdrupes.vmoperator.manager.events.UpdateAssignment;
053import org.jdrupes.vmoperator.manager.events.VmChannel;
054import org.jdrupes.vmoperator.manager.events.VmResourceChanged;
055import org.jdrupes.vmoperator.util.GsonPtr;
056import org.jgrapes.core.Channel;
057import org.jgrapes.core.Event;
058import org.jgrapes.core.EventPipeline;
059import org.jgrapes.core.annotation.Handler;
060
061/**
062 * Watches for changes of VM definitions. When a VM definition (CR)
063 * becomes known, is is registered with a {@link ChannelManager} and thus
064 * gets an associated {@link VmChannel} and an associated
065 * {@link EventPipeline}.
066 * 
067 * The {@link EventPipeline} is used for submitting an action that processes
068 * the change data from kubernetes, eventually transforming it to a
069 * {@link VmResourceChanged} event that is handled by another
070 * {@link EventPipeline} associated with the {@link VmChannel}. This
071 * event pipeline should be used for all events related to changes of
072 * a particular VM.
073 */
074public class VmMonitor extends
075        AbstractMonitor<VmDefinition, VmDefinitions, VmChannel> {
076
077    private final ChannelManager<String, VmChannel,
078            EventPipeline> channelManager;
079
080    /**
081     * Instantiates a new VM definition watcher.
082     *
083     * @param componentChannel the component channel
084     * @param channelManager the channel manager
085     */
086    public VmMonitor(Channel componentChannel,
087            ChannelManager<String, VmChannel, EventPipeline> channelManager) {
088        super(componentChannel, VmDefinition.class,
089            VmDefinitions.class);
090        this.channelManager = channelManager;
091    }
092
093    @Override
094    protected void prepareMonitoring() throws IOException, ApiException {
095        client(new K8sClient());
096
097        // Get all our API versions
098        var ctx = K8s.context(client(), Crd.GROUP, "", Crd.KIND_VM);
099        if (ctx.isEmpty()) {
100            logger.severe(() -> "Cannot get CRD context.");
101            return;
102        }
103        context(ctx.get());
104
105        // Remove left over resources
106        purge();
107    }
108
109    private void purge() throws ApiException {
110        // Get existing CRs (VMs)
111        var known = K8sDynamicStub.list(client(), context(), namespace())
112            .stream().map(stub -> stub.name()).collect(Collectors.toSet());
113        ListOptions opts = new ListOptions();
114        opts.setLabelSelector(
115            "app.kubernetes.io/managed-by=" + VM_OP_NAME + ","
116                + "app.kubernetes.io/name=" + APP_NAME);
117        for (var context : Set.of(K8sV1StatefulSetStub.CONTEXT,
118            K8sV1ConfigMapStub.CONTEXT)) {
119            for (var resStub : K8sDynamicStub.list(client(), context,
120                namespace(), opts)) {
121                String instance = resStub.model()
122                    .map(m -> m.metadata().getName()).orElse("(unknown)");
123                if (!known.contains(instance)) {
124                    resStub.delete();
125                }
126            }
127        }
128    }
129
130    @Override
131    protected void handleChange(K8sClient client,
132            Watch.Response<VmDefinition> response) {
133        var name = response.object.getMetadata().getName();
134
135        // Process the response data on a VM specific pipeline to
136        // increase concurrency when e.g. starting many VMs.
137        var preparing = channelManager.associated(name)
138            .orElseGet(() -> newEventPipeline());
139        preparing.submit("VmChange[" + name + "]",
140            () -> processChange(client, response, preparing));
141    }
142
143    private void processChange(K8sClient client,
144            Watch.Response<VmDefinition> response, EventPipeline preparing) {
145        // Get full definition and associate with channel as backup
146        var vmDef = response.object;
147        if (vmDef.data() == null) {
148            // ADDED event does not provide data, see
149            // https://github.com/kubernetes-client/java/issues/3215
150            vmDef = getModel(client, vmDef);
151        }
152        var name = response.object.getMetadata().getName();
153        var channel = channelManager.channel(name)
154            .orElseGet(() -> channelManager.createChannel(name));
155        if (vmDef.data() != null) {
156            // New data, augment and save
157            addExtraData(vmDef, channel.vmDefinition());
158            channel.setVmDefinition(vmDef);
159        } else {
160            // Reuse cached (e.g. if deleted)
161            vmDef = channel.vmDefinition();
162        }
163        if (vmDef == null) {
164            logger.warning(() -> "Cannot get defintion for "
165                + response.object.getMetadata());
166            return;
167        }
168        channelManager.put(name, channel, preparing);
169
170        // Create and fire changed event. Remove channel from channel
171        // manager on completion.
172        VmResourceChanged chgEvt
173            = new VmResourceChanged(ResponseType.valueOf(response.type), vmDef,
174                channel.setGeneration(response.object.getMetadata()
175                    .getGeneration()),
176                false);
177        if (ResponseType.valueOf(response.type) == ResponseType.DELETED) {
178            chgEvt = Event.onCompletion(chgEvt,
179                e -> channelManager.remove(e.vmDefinition().name()));
180        }
181        channel.fire(chgEvt);
182    }
183
184    private VmDefinition getModel(K8sClient client, VmDefinition vmDef) {
185        try {
186            return VmDefinitionStub.get(client, context(), namespace(),
187                vmDef.metadata().getName()).model().orElse(null);
188        } catch (ApiException e) {
189            return null;
190        }
191    }
192
193    private void addExtraData(VmDefinition vmDef, VmDefinition prevState) {
194        var extra = new VmExtraData(vmDef);
195        var prevExtra = Optional.ofNullable(prevState).map(VmDefinition::extra);
196
197        // Maintain (or initialize) the resetCount
198        extra.resetCount(prevExtra.map(VmExtraData::resetCount).orElse(0L));
199
200        // Maintain node info
201        prevExtra
202            .ifPresent(e -> extra.nodeInfo(e.nodeName(), e.nodeAddresses()));
203    }
204
205    /**
206     * On pod changed.
207     *
208     * @param event the event
209     * @param channel the channel
210     */
211    @Handler
212    public void onPodChanged(PodChanged event, VmChannel channel) {
213        var vmDef = channel.vmDefinition();
214
215        // Make sure that this is properly sync'd with VM CR changes.
216        channelManager.associated(vmDef.name())
217            .orElseGet(() -> activeEventPipeline())
218            .submit("NodeInfo[" + vmDef.name() + "]",
219                () -> {
220                    updateNodeInfo(event, vmDef);
221                    channel.fire(new VmResourceChanged(ResponseType.MODIFIED,
222                        vmDef, false, true));
223                });
224    }
225
226    private void updateNodeInfo(PodChanged event, VmDefinition vmDef) {
227        var extra = vmDef.extra();
228        if (event.type() == ResponseType.DELETED) {
229            // The status of a deleted pod is the status before deletion,
230            // i.e. the node info is still cached and must be removed.
231            extra.nodeInfo("", Collections.emptyList());
232            return;
233        }
234
235        // Get current node info from pod
236        var pod = event.pod();
237        var nodeName = Optional
238            .ofNullable(pod.getSpec().getNodeName()).orElse("");
239        logger.finer(() -> "Adding node name " + nodeName
240            + " to VM info for " + vmDef.name());
241        var addrs = new ArrayList<String>();
242        Optional.ofNullable(pod.getStatus().getPodIPs())
243            .orElse(Collections.emptyList()).stream()
244            .map(ip -> ip.getIp()).forEach(addrs::add);
245        logger.finer(() -> "Adding node addresses " + addrs
246            + " to VM info for " + vmDef.name());
247        extra.nodeInfo(nodeName, addrs);
248    }
249
250    /**
251     * On modify vm.
252     *
253     * @param event the event
254     * @throws ApiException the api exception
255     * @throws IOException Signals that an I/O exception has occurred.
256     */
257    @Handler
258    public void onModifyVm(ModifyVm event, VmChannel channel)
259            throws ApiException, IOException {
260        patchVmDef(channel.client(), event.name(), "spec/vm/" + event.path(),
261            event.value());
262    }
263
264    private void patchVmDef(K8sClient client, String name, String path,
265            Object value) throws ApiException, IOException {
266        var vmStub = K8sDynamicStub.get(client,
267            new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM), namespace(),
268            name);
269
270        // Patch running
271        String valueAsText = value instanceof String
272            ? "\"" + value + "\""
273            : value.toString();
274        var res = vmStub.patch(V1Patch.PATCH_FORMAT_JSON_PATCH,
275            new V1Patch("[{\"op\": \"replace\", \"path\": \"/"
276                + path + "\", \"value\": " + valueAsText + "}]"),
277            client.defaultPatchOptions());
278        if (!res.isPresent()) {
279            logger.warning(
280                () -> "Cannot patch definition for Vm " + vmStub.name());
281        }
282    }
283
284    /**
285     * Attempt to Update the assignment information in the status of the
286     * VM CR. Returns true if successful. The handler does not attempt
287     * retries, because in case of failure it will be necessary to
288     * re-evaluate the chosen VM.
289     *
290     * @param event the event
291     * @param channel the channel
292     * @throws ApiException the api exception
293     */
294    @Handler
295    public void onUpdatedAssignment(UpdateAssignment event, VmChannel channel)
296            throws ApiException {
297        try {
298            var vmDef = channel.vmDefinition();
299            var vmStub = VmDefinitionStub.get(channel.client(),
300                new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM),
301                vmDef.namespace(), vmDef.name());
302            if (vmStub.updateStatus(vmDef, from -> {
303                JsonObject status = from.statusJson();
304                if (event.toUser() == null) {
305                    ((JsonObject) GsonPtr.to(status).get())
306                        .remove(Status.ASSIGNMENT);
307                } else {
308                    var assignment = GsonPtr.to(status).to(Status.ASSIGNMENT);
309                    assignment.set("pool", event.fromPool().name());
310                    assignment.set("user", event.toUser());
311                    assignment.set("lastUsed", Instant.now().toString());
312                }
313                return status;
314            }).isPresent()) {
315                event.setResult(true);
316            }
317        } catch (ApiException e) {
318            // Log exceptions except for conflict, which can be expected
319            if (HttpURLConnection.HTTP_CONFLICT != e.getCode()) {
320                throw e;
321            }
322        }
323        event.setResult(false);
324    }
325
326}