001/*
002 * VM-Operator
003 * Copyright (C) 2023, 2025 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.manager;
020
021import com.google.gson.JsonObject;
022import io.kubernetes.client.apimachinery.GroupVersionKind;
023import io.kubernetes.client.openapi.ApiException;
024import io.kubernetes.client.openapi.Configuration;
025import java.io.IOException;
026import java.nio.file.Files;
027import java.nio.file.Path;
028import java.time.Instant;
029import java.util.Comparator;
030import java.util.Optional;
031import java.util.logging.Level;
032import org.jdrupes.vmoperator.common.Constants.Crd;
033import org.jdrupes.vmoperator.common.Constants.Status;
034import org.jdrupes.vmoperator.common.K8sClient;
035import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
036import org.jdrupes.vmoperator.common.VmDefinition.Assignment;
037import org.jdrupes.vmoperator.common.VmDefinitionStub;
038import org.jdrupes.vmoperator.common.VmPool;
039import org.jdrupes.vmoperator.manager.events.AssignVm;
040import org.jdrupes.vmoperator.manager.events.ChannelManager;
041import org.jdrupes.vmoperator.manager.events.Exit;
042import org.jdrupes.vmoperator.manager.events.GetPools;
043import org.jdrupes.vmoperator.manager.events.GetVms;
044import org.jdrupes.vmoperator.manager.events.GetVms.VmData;
045import org.jdrupes.vmoperator.manager.events.ModifyVm;
046import org.jdrupes.vmoperator.manager.events.PodChanged;
047import org.jdrupes.vmoperator.manager.events.UpdateAssignment;
048import org.jdrupes.vmoperator.manager.events.VmChannel;
049import org.jdrupes.vmoperator.manager.events.VmPoolChanged;
050import org.jdrupes.vmoperator.manager.events.VmResourceChanged;
051import org.jgrapes.core.Channel;
052import org.jgrapes.core.Component;
053import org.jgrapes.core.annotation.Handler;
054import org.jgrapes.core.events.HandlingError;
055import org.jgrapes.core.events.Start;
056import org.jgrapes.util.events.ConfigurationUpdate;
057
058/**
059 * Implements a controller as defined in the
060 * [Operator Whitepaper](https://github.com/cncf/tag-app-delivery/blob/eece8f7307f2970f46f100f51932db106db46968/operator-wg/whitepaper/Operator-WhitePaper_v1-0.md#operator-components-in-kubernetes).
061 * 
062 * The implementation splits the controller in two components. The
063 * {@link VmMonitor} and the {@link Reconciler}. The former watches
064 * the VM definitions (CRs) and generates {@link VmResourceChanged} events
065 * when they change. The latter handles the changes and reconciles the
066 * resources in the cluster.
067 * 
068 * The controller itself supports a single configuration property:
069 * ```yaml
070 * "/Manager":
071 *   "/Controller":
072 *     namespace: vmop-dev
073 * ```
074 * This may only be set when running the Manager (and thus the Controller)
075 * outside a container during development.  
076 * 
077 * ![Controller components](controller-components.svg)
078 * 
079 * @startuml controller-components.svg
080 * skinparam component {
081 *   BackGroundColor #FEFECE
082 *   BorderColor #A80036
083 *   BorderThickness 1.25
084 *   BackgroundColor<<internal>> #F1F1F1
085 *   BorderColor<<internal>> #181818
086 *   BorderThickness<<internal>> 1
087 * }
088 * 
089 * [Controller]
090 * [Controller] *--> [VmWatcher]
091 * [Controller] *--> [Reconciler]
092 * @enduml
093 */
094public class Controller extends Component {
095
096    private String namespace;
097    private final ChannelManager<String, VmChannel, ?> chanMgr;
098
099    /**
100     * Creates a new instance.
101     */
102    @SuppressWarnings("PMD.ConstructorCallsOverridableMethod")
103    public Controller(Channel componentChannel) {
104        super(componentChannel);
105        // Prepare component tree
106        chanMgr = new ChannelManager<>(name -> {
107            try {
108                return new VmChannel(channel(), newEventPipeline(),
109                    new K8sClient());
110            } catch (IOException e) {
111                logger.log(Level.SEVERE, e, () -> "Failed to create client"
112                    + " for handling changes: " + e.getMessage());
113                return null;
114            }
115        });
116        attach(new VmMonitor(channel(), chanMgr));
117        attach(new DisplaySecretMonitor(channel(), chanMgr));
118        // Currently, we don't use the IP assigned by the load balancer
119        // to access the VM's console. Might change in the future.
120        // attach(new ServiceMonitor(channel()).channelManager(chanMgr));
121        attach(new Reconciler(channel()));
122        attach(new PoolMonitor(channel()));
123        attach(new PodMonitor(channel(), chanMgr));
124    }
125
126    /**
127     * Special handling of {@link ApiException} thrown by handlers.
128     *
129     * @param event the event
130     */
131    @Handler(channels = Channel.class)
132    public void onHandlingError(HandlingError event) {
133        if (event.throwable() instanceof ApiException exc) {
134            logger.log(Level.WARNING, exc,
135                () -> "Problem accessing kubernetes: " + exc.getResponseBody());
136            event.stop();
137        }
138    }
139
140    /**
141     * Configure the component.
142     *
143     * @param event the event
144     */
145    @Handler
146    public void onConfigurationUpdate(ConfigurationUpdate event) {
147        event.structured(componentPath()).ifPresent(c -> {
148            if (c.containsKey("namespace")) {
149                namespace = (String) c.get("namespace");
150            }
151        });
152    }
153
154    /**
155     * Handle the start event. Has higher priority because it configures
156     * the default Kubernetes client.
157     *
158     * @param event the event
159     * @throws IOException 
160     * @throws ApiException 
161     */
162    @Handler(priority = 100)
163    public void onStart(Start event) throws IOException, ApiException {
164        // Make sure to use thread specific client
165        // https://github.com/kubernetes-client/java/issues/100
166        Configuration.setDefaultApiClient(null);
167
168        // Verify that a namespace has been configured
169        if (namespace == null) {
170            var path = Path
171                .of("/var/run/secrets/kubernetes.io/serviceaccount/namespace");
172            if (Files.isReadable(path)) {
173                namespace = Files.lines(path).findFirst().orElse(null);
174                fire(new ConfigurationUpdate().add(componentPath(), "namespace",
175                    namespace));
176            }
177        }
178        if (namespace == null) {
179            logger.severe(() -> "Namespace to control not configured and"
180                + " no file in kubernetes directory.");
181            event.cancel(true);
182            fire(new Exit(2));
183            return;
184        }
185        logger.config(() -> "Controlling namespace \"" + namespace + "\".");
186    }
187
188    /**
189     * Returns the VM data.
190     *
191     * @param event the event
192     */
193    @Handler
194    public void onGetVms(GetVms event) {
195        event.setResult(chanMgr.channels().stream()
196            .filter(c -> event.name().isEmpty()
197                || c.vmDefinition().name().equals(event.name().get()))
198            .filter(c -> event.user().isEmpty() && event.roles().isEmpty()
199                || !c.vmDefinition().permissionsFor(event.user().orElse(null),
200                    event.roles()).isEmpty())
201            .filter(c -> event.fromPool().isEmpty()
202                || c.vmDefinition().assignment().map(Assignment::pool)
203                    .map(p -> p.equals(event.fromPool().get())).orElse(false))
204            .filter(c -> event.toUser().isEmpty()
205                || c.vmDefinition().assignment().map(Assignment::user)
206                    .map(u -> u.equals(event.toUser().get())).orElse(false))
207            .map(c -> new VmData(c.vmDefinition(), c))
208            .toList());
209    }
210
211    /**
212     * Assign a VM if not already assigned.
213     *
214     * @param event the event
215     * @throws ApiException the api exception
216     * @throws InterruptedException 
217     */
218    @Handler
219    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
220    public void onAssignVm(AssignVm event)
221            throws ApiException, InterruptedException {
222        while (true) {
223            // Search for existing assignment.
224            var vmQuery = chanMgr.channels().stream()
225                .filter(c -> c.vmDefinition().assignment().map(Assignment::pool)
226                    .map(p -> p.equals(event.fromPool())).orElse(false))
227                .filter(c -> c.vmDefinition().assignment().map(Assignment::user)
228                    .map(u -> u.equals(event.toUser())).orElse(false))
229                .findFirst();
230            if (vmQuery.isPresent()) {
231                var vmDef = vmQuery.get().vmDefinition();
232                event.setResult(new VmData(vmDef, vmQuery.get()));
233                return;
234            }
235
236            // Get the pool definition for checking possible assignment
237            VmPool vmPool = newEventPipeline().fire(new GetPools()
238                .withName(event.fromPool())).get().stream().findFirst()
239                .orElse(null);
240            if (vmPool == null) {
241                return;
242            }
243
244            // Find available VM.
245            vmQuery = chanMgr.channels().stream()
246                .filter(c -> vmPool.isAssignable(c.vmDefinition()))
247                .sorted(Comparator.comparing((VmChannel c) -> c.vmDefinition()
248                    .assignment().map(Assignment::lastUsed)
249                    .orElse(Instant.ofEpochSecond(0)))
250                    .thenComparing(preferRunning))
251                .findFirst();
252
253            // None found
254            if (vmQuery.isEmpty()) {
255                return;
256            }
257
258            // Assign to user
259            var chosenVm = vmQuery.get();
260            if (Optional.ofNullable(chosenVm.fire(new UpdateAssignment(
261                vmPool, event.toUser())).get()).orElse(false)) {
262                var vmDef = chosenVm.vmDefinition();
263                event.setResult(new VmData(vmDef, chosenVm));
264
265                // Make sure that a newly assigned VM is running.
266                chosenVm.fire(new ModifyVm(vmDef.name(), "state", "Running"));
267                return;
268            }
269        }
270    }
271
272    private static Comparator<VmChannel> preferRunning
273        = new Comparator<>() {
274            @Override
275            public int compare(VmChannel ch1, VmChannel ch2) {
276                if (ch1.vmDefinition().conditionStatus("Running").orElse(false)
277                    && !ch2.vmDefinition().conditionStatus("Running")
278                        .orElse(false)) {
279                    return -1;
280                }
281                return 0;
282            }
283        };
284
285    /**
286     * When s pool is deleted, remove all related assignments.
287     *
288     * @param event the event
289     * @throws InterruptedException 
290     */
291    @Handler
292    @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
293    public void onPoolChanged(VmPoolChanged event) throws InterruptedException {
294        if (!event.deleted()) {
295            return;
296        }
297        var vms = newEventPipeline()
298            .fire(new GetVms().assignedFrom(event.vmPool().name())).get();
299        for (var vm : vms) {
300            vm.channel().fire(new UpdateAssignment(event.vmPool(), null));
301        }
302    }
303
304    /**
305     * Remove runner version from status when pod is deleted
306     *
307     * @param event the event
308     * @param channel the channel
309     * @throws ApiException the api exception
310     */
311    @Handler
312    public void onPodChange(PodChanged event, VmChannel channel)
313            throws ApiException {
314        if (event.type() == ResponseType.DELETED) {
315            // Remove runner info from status
316            var vmDef = channel.vmDefinition();
317            var vmStub = VmDefinitionStub.get(channel.client(),
318                new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM),
319                vmDef.namespace(), vmDef.name());
320            vmStub.updateStatus(from -> {
321                JsonObject status = from.statusJson();
322                status.remove(Status.RUNNER_VERSION);
323                return status;
324            });
325        }
326    }
327}