002 * VM-Operator
003 * Copyright (C) 2023,2024 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
019package org.jdrupes.vmoperator.manager;
021import com.google.gson.JsonObject;
022import io.kubernetes.client.apimachinery.GroupVersionKind;
023import io.kubernetes.client.openapi.ApiException;
024import io.kubernetes.client.util.Watch;
025import java.io.IOException;
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.concurrent.ConcurrentHashMap;
031import org.jdrupes.vmoperator.common.Constants.Crd;
032import org.jdrupes.vmoperator.common.Constants.Status;
033import org.jdrupes.vmoperator.common.K8s;
034import org.jdrupes.vmoperator.common.K8sClient;
035import org.jdrupes.vmoperator.common.K8sDynamicModel;
036import org.jdrupes.vmoperator.common.K8sDynamicModels;
037import org.jdrupes.vmoperator.common.K8sDynamicStub;
038import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
039import org.jdrupes.vmoperator.common.VmDefinitionStub;
040import org.jdrupes.vmoperator.common.VmPool;
041import org.jdrupes.vmoperator.manager.events.GetPools;
042import org.jdrupes.vmoperator.manager.events.VmDefChanged;
043import org.jdrupes.vmoperator.manager.events.VmPoolChanged;
044import org.jdrupes.vmoperator.util.GsonPtr;
045import org.jgrapes.core.Channel;
046import org.jgrapes.core.EventPipeline;
047import org.jgrapes.core.annotation.Handler;
048import org.jgrapes.core.events.Attached;
051 * Watches for changes of VM pools. Reports the changes using 
052 * {@link VmPoolChanged} events fired on a special pipeline to
053 * avoid concurrent change informations.
054 */
055@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" })
056public class PoolMonitor extends
057        AbstractMonitor<K8sDynamicModel, K8sDynamicModels, Channel> {
059    private final Map<String, VmPool> pools = new ConcurrentHashMap<>();
060    private EventPipeline poolPipeline;
062    /**
063     * Instantiates a new VM pool manager.
064     *
065     * @param componentChannel the component channel
066     */
067    public PoolMonitor(Channel componentChannel) {
068        super(componentChannel, K8sDynamicModel.class,
069            K8sDynamicModels.class);
070    }
072    /**
073     * On attached.
074     *
075     * @param event the event
076     */
077    @Handler
078    @SuppressWarnings("PMD.CompareObjectsWithEquals")
079    public void onAttached(Attached event) {
080        if (event.node() == this) {
081            poolPipeline = newEventPipeline();
082        }
083    }
085    @Override
086    protected void prepareMonitoring() throws IOException, ApiException {
087        client(new K8sClient());
089        // Get all our API versions
090        var ctx = K8s.context(client(), Crd.GROUP, "", Crd.KIND_VM_POOL);
091        if (ctx.isEmpty()) {
092            logger.severe(() -> "Cannot get CRD context.");
093            return;
094        }
095        context(ctx.get());
096    }
098    @Override
099    protected void handleChange(K8sClient client,
100            Watch.Response<K8sDynamicModel> response) {
102        var type = ResponseType.valueOf(response.type);
103        var poolName = response.object.metadata().getName();
105        // When pool is deleted, save VMs in pending
106        if (type == ResponseType.DELETED) {
107            Optional.ofNullable(pools.get(poolName)).ifPresent(pool -> {
108                pool.setDefined(false);
109                if (pool.vms().isEmpty()) {
110                    pools.remove(poolName);
111                }
112                poolPipeline.fire(new VmPoolChanged(pool, true));
113            });
114            return;
115        }
117        // Get full definition
118        var poolModel = response.object;
119        if (poolModel.data() == null) {
120            // ADDED event does not provide data, see
121            // https://github.com/kubernetes-client/java/issues/3215
122            try {
123                poolModel = K8sDynamicStub.get(client(), context(), namespace(),
124                    poolModel.metadata().getName()).model().orElse(null);
125            } catch (ApiException e) {
126                return;
127            }
128        }
130        // Get pool and merge changes
131        var vmPool = pools.computeIfAbsent(poolName, k -> new VmPool(poolName));
132        var newData = client().getJSON().getGson().fromJson(
133            GsonPtr.to(poolModel.data()).to("spec").get(), VmPool.class);
134        vmPool.setRetention(newData.retention());
135        vmPool.setPermissions(newData.permissions());
136        vmPool.setDefined(true);
137        poolPipeline.fire(new VmPoolChanged(vmPool));
138    }
140    /**
141     * Track VM definition changes.
142     *
143     * @param event the event
144     * @throws ApiException 
145     */
146    @Handler
147    public void onVmDefChanged(VmDefChanged event) throws ApiException {
148        final var vmDef = event.vmDefinition();
149        final String vmName = vmDef.name();
150        switch (event.type()) {
151        case ADDED:
152            vmDef.<List<String>> fromSpec("pools")
153                .orElse(Collections.emptyList()).stream().forEach(p -> {
154                    pools.computeIfAbsent(p, k -> new VmPool(p))
155                        .vms().add(vmName);
156                    poolPipeline.fire(new VmPoolChanged(pools.get(p)));
157                });
158            break;
159        case DELETED:
160            pools.values().stream().forEach(p -> {
161                if (p.vms().remove(vmName)) {
162                    poolPipeline.fire(new VmPoolChanged(p));
163                }
164            });
165            return;
166        default:
167            break;
168        }
170        // Sync last usage to console state change if user matches
171        if (vmDef.assignedTo()
172            .map(at -> at.equals(vmDef.consoleUser().orElse(null)))
173            .orElse(true)) {
174            return;
175        }
177        var ccChange = vmDef.condition("ConsoleConnected")
178            .map(cc -> cc.getLastTransitionTime().toInstant());
179        if (ccChange
180            .map(tt -> vmDef.assignmentLastUsed().map(alu -> alu.isAfter(tt))
181                .orElse(true))
182            .orElse(true)) {
183            return;
184        }
185        var vmStub = VmDefinitionStub.get(client(),
186            new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM),
187            vmDef.namespace(), vmDef.name());
188        vmStub.updateStatus(from -> {
189            // TODO
190            JsonObject status = from.statusJson();
191            var assignment = GsonPtr.to(status).to(Status.ASSIGNMENT);
192            assignment.set("lastUsed", ccChange.get().toString());
193            return status;
194        });
195    }
197    /**
198     * Return the requested pools.
199     *
200     * @param event the event
201     */
202    @Handler
203    public void onGetPools(GetPools event) {
204        event.setResult(pools.values().stream().filter(VmPool::isDefined)
205            .filter(p -> event.name().isEmpty()
206                || p.name().equals(event.name().get()))
207            .filter(p -> event.forUser().isEmpty() && event.forRoles().isEmpty()
208                || !p.permissionsFor(event.forUser().orElse(null),
209                    event.forRoles()).isEmpty())
210            .toList());
211    }