001/*
002 * VM-Operator
003 * Copyright (C) 2023, 2025 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.manager;
020
021import com.google.gson.JsonObject;
022import io.kubernetes.client.apimachinery.GroupVersionKind;
023import io.kubernetes.client.openapi.ApiException;
024import io.kubernetes.client.openapi.Configuration;
025import java.io.IOException;
026import java.nio.file.Files;
027import java.nio.file.Path;
028import java.time.Instant;
029import java.util.Comparator;
030import java.util.Optional;
031import java.util.logging.Level;
032import org.jdrupes.vmoperator.common.Constants.Crd;
033import org.jdrupes.vmoperator.common.Constants.Status;
034import org.jdrupes.vmoperator.common.K8sClient;
035import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
036import org.jdrupes.vmoperator.common.VmDefinition.Assignment;
037import org.jdrupes.vmoperator.common.VmDefinitionStub;
038import org.jdrupes.vmoperator.common.VmPool;
039import org.jdrupes.vmoperator.manager.events.AssignVm;
040import org.jdrupes.vmoperator.manager.events.ChannelManager;
041import org.jdrupes.vmoperator.manager.events.Exit;
042import org.jdrupes.vmoperator.manager.events.GetPools;
043import org.jdrupes.vmoperator.manager.events.GetVms;
044import org.jdrupes.vmoperator.manager.events.GetVms.VmData;
045import org.jdrupes.vmoperator.manager.events.ModifyVm;
046import org.jdrupes.vmoperator.manager.events.PodChanged;
047import org.jdrupes.vmoperator.manager.events.UpdateAssignment;
048import org.jdrupes.vmoperator.manager.events.VmChannel;
049import org.jdrupes.vmoperator.manager.events.VmPoolChanged;
050import org.jdrupes.vmoperator.manager.events.VmResourceChanged;
051import org.jgrapes.core.Channel;
052import org.jgrapes.core.Component;
053import org.jgrapes.core.EventPipeline;
054import org.jgrapes.core.annotation.Handler;
055import org.jgrapes.core.events.HandlingError;
056import org.jgrapes.core.events.Start;
057import org.jgrapes.util.events.ConfigurationUpdate;
058
059/**
060 * Implements a controller as defined in the
061 * [Operator Whitepaper](https://github.com/cncf/tag-app-delivery/blob/eece8f7307f2970f46f100f51932db106db46968/operator-wg/whitepaper/Operator-WhitePaper_v1-0.md#operator-components-in-kubernetes).
062 * 
063 * The implementation splits the controller in two components. The
064 * {@link VmMonitor} and the {@link Reconciler}. The former watches
065 * the VM definitions (CRs) and generates {@link VmResourceChanged} events
066 * when they change. The latter handles the changes and reconciles the
067 * resources in the cluster.
068 * 
069 * The controller itself supports a single configuration property:
070 * ```yaml
071 * "/Manager":
072 *   "/Controller":
073 *     namespace: vmop-dev
074 * ```
075 * This may only be set when running the Manager (and thus the Controller)
076 * outside a container during development.  
077 * 
078 * ![Controller components](controller-components.svg)
079 * 
080 * @startuml controller-components.svg
081 * skinparam component {
082 *   BackGroundColor #FEFECE
083 *   BorderColor #A80036
084 *   BorderThickness 1.25
085 *   BackgroundColor<<internal>> #F1F1F1
086 *   BorderColor<<internal>> #181818
087 *   BorderThickness<<internal>> 1
088 * }
089 * 
090 * [Controller]
091 * [Controller] *--> [VmWatcher]
092 * [Controller] *--> [Reconciler]
093 * @enduml
094 */
095public class Controller extends Component {
096
097    private String namespace;
098    private final ChannelManager<String, VmChannel, EventPipeline> chanMgr;
099
100    /**
101     * Creates a new instance.
102     */
103    @SuppressWarnings("PMD.ConstructorCallsOverridableMethod")
104    public Controller(Channel componentChannel) {
105        super(componentChannel);
106        // Prepare component tree
107        chanMgr = new ChannelManager<>(name -> {
108            try {
109                return new VmChannel(channel(), newEventPipeline(),
110                    new K8sClient());
111            } catch (IOException e) {
112                logger.log(Level.SEVERE, e, () -> "Failed to create client"
113                    + " for handling changes: " + e.getMessage());
114                return null;
115            }
116        });
117        attach(new VmMonitor(channel(), chanMgr));
118        attach(new DisplaySecretMonitor(channel(), chanMgr));
119        // Currently, we don't use the IP assigned by the load balancer
120        // to access the VM's console. Might change in the future.
121        // attach(new ServiceMonitor(channel()).channelManager(chanMgr));
122        attach(new Reconciler(channel()));
123        attach(new PoolMonitor(channel()));
124        attach(new PodMonitor(channel(), chanMgr));
125    }
126
127    /**
128     * Special handling of {@link ApiException} thrown by handlers.
129     *
130     * @param event the event
131     */
132    @Handler(channels = Channel.class)
133    public void onHandlingError(HandlingError event) {
134        if (event.throwable() instanceof ApiException exc) {
135            logger.log(Level.WARNING, exc,
136                () -> "Problem accessing kubernetes: " + exc.getResponseBody());
137            event.stop();
138        }
139    }
140
141    /**
142     * Configure the component.
143     *
144     * @param event the event
145     */
146    @Handler
147    public void onConfigurationUpdate(ConfigurationUpdate event) {
148        event.structured(componentPath()).ifPresent(c -> {
149            if (c.containsKey("namespace")) {
150                namespace = (String) c.get("namespace");
151            }
152        });
153    }
154
155    /**
156     * Handle the start event. Has higher priority because it configures
157     * the default Kubernetes client.
158     *
159     * @param event the event
160     * @throws IOException 
161     * @throws ApiException 
162     */
163    @Handler(priority = 100)
164    public void onStart(Start event) throws IOException, ApiException {
165        // Make sure to use thread specific client
166        // https://github.com/kubernetes-client/java/issues/100
167        Configuration.setDefaultApiClient(null);
168
169        // Verify that a namespace has been configured
170        if (namespace == null) {
171            var path = Path
172                .of("/var/run/secrets/kubernetes.io/serviceaccount/namespace");
173            if (Files.isReadable(path)) {
174                namespace = Files.lines(path).findFirst().orElse(null);
175                fire(new ConfigurationUpdate().add(componentPath(), "namespace",
176                    namespace));
177            }
178        }
179        if (namespace == null) {
180            logger.severe(() -> "Namespace to control not configured and"
181                + " no file in kubernetes directory.");
182            event.cancel(true);
183            fire(new Exit(2));
184            return;
185        }
186        logger.config(() -> "Controlling namespace \"" + namespace + "\".");
187    }
188
189    /**
190     * Returns the VM data.
191     *
192     * @param event the event
193     */
194    @Handler
195    public void onGetVms(GetVms event) {
196        event.setResult(chanMgr.channels().stream()
197            .filter(c -> event.name().isEmpty()
198                || c.vmDefinition().name().equals(event.name().get()))
199            .filter(c -> event.user().isEmpty() && event.roles().isEmpty()
200                || !c.vmDefinition().permissionsFor(event.user().orElse(null),
201                    event.roles()).isEmpty())
202            .filter(c -> event.fromPool().isEmpty()
203                || c.vmDefinition().assignment().map(Assignment::pool)
204                    .map(p -> p.equals(event.fromPool().get())).orElse(false))
205            .filter(c -> event.toUser().isEmpty()
206                || c.vmDefinition().assignment().map(Assignment::user)
207                    .map(u -> u.equals(event.toUser().get())).orElse(false))
208            .map(c -> new VmData(c.vmDefinition(), c))
209            .toList());
210    }
211
212    /**
213     * Assign a VM if not already assigned.
214     *
215     * @param event the event
216     * @throws ApiException the api exception
217     * @throws InterruptedException 
218     */
219    @Handler
220    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
221    public void onAssignVm(AssignVm event)
222            throws ApiException, InterruptedException {
223        while (true) {
224            // Search for existing assignment.
225            var vmQuery = chanMgr.channels().stream()
226                .filter(c -> c.vmDefinition().assignment().map(Assignment::pool)
227                    .map(p -> p.equals(event.fromPool())).orElse(false))
228                .filter(c -> c.vmDefinition().assignment().map(Assignment::user)
229                    .map(u -> u.equals(event.toUser())).orElse(false))
230                .findFirst();
231            if (vmQuery.isPresent()) {
232                var vmDef = vmQuery.get().vmDefinition();
233                event.setResult(new VmData(vmDef, vmQuery.get()));
234                return;
235            }
236
237            // Get the pool definition for checking possible assignment
238            VmPool vmPool = newEventPipeline().fire(new GetPools()
239                .withName(event.fromPool())).get().stream().findFirst()
240                .orElse(null);
241            if (vmPool == null) {
242                return;
243            }
244
245            // Find available VM.
246            vmQuery = chanMgr.channels().stream()
247                .filter(c -> vmPool.isAssignable(c.vmDefinition()))
248                .sorted(Comparator.comparing((VmChannel c) -> c.vmDefinition()
249                    .assignment().map(Assignment::lastUsed)
250                    .orElse(Instant.ofEpochSecond(0)))
251                    .thenComparing(preferRunning))
252                .findFirst();
253
254            // None found
255            if (vmQuery.isEmpty()) {
256                return;
257            }
258
259            // Assign to user
260            var chosenVm = vmQuery.get();
261            if (Optional.ofNullable(chosenVm.fire(new UpdateAssignment(
262                vmPool, event.toUser())).get()).orElse(false)) {
263                var vmDef = chosenVm.vmDefinition();
264                event.setResult(new VmData(vmDef, chosenVm));
265
266                // Make sure that a newly assigned VM is running.
267                chosenVm.fire(new ModifyVm(vmDef.name(), "state", "Running"));
268                return;
269            }
270        }
271    }
272
273    private static Comparator<VmChannel> preferRunning
274        = new Comparator<>() {
275            @Override
276            public int compare(VmChannel ch1, VmChannel ch2) {
277                if (ch1.vmDefinition().conditionStatus("Running").orElse(false)
278                    && !ch2.vmDefinition().conditionStatus("Running")
279                        .orElse(false)) {
280                    return -1;
281                }
282                return 0;
283            }
284        };
285
286    /**
287     * When s pool is deleted, remove all related assignments.
288     *
289     * @param event the event
290     * @throws InterruptedException 
291     */
292    @Handler
293    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
294    public void onPoolChanged(VmPoolChanged event) throws InterruptedException {
295        if (!event.deleted()) {
296            return;
297        }
298        var vms = newEventPipeline()
299            .fire(new GetVms().assignedFrom(event.vmPool().name())).get();
300        for (var vm : vms) {
301            vm.channel().fire(new UpdateAssignment(event.vmPool(), null));
302        }
303    }
304
305    /**
306     * Remove runner version from status when pod is deleted
307     *
308     * @param event the event
309     * @param channel the channel
310     * @throws ApiException the api exception
311     */
312    @Handler
313    public void onPodChange(PodChanged event, VmChannel channel)
314            throws ApiException {
315        if (event.type() == ResponseType.DELETED) {
316            // Remove runner info from status
317            var vmDef = channel.vmDefinition();
318            var vmStub = VmDefinitionStub.get(channel.client(),
319                new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM),
320                vmDef.namespace(), vmDef.name());
321            vmStub.updateStatus(from -> {
322                JsonObject status = from.statusJson();
323                status.remove(Status.RUNNER_VERSION);
324                return status;
325            });
326        }
327    }
328}