001/*
002 * VM-Operator
003 * Copyright (C) 2023,2024 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.manager;
020
021import com.google.gson.JsonObject;
022import io.kubernetes.client.apimachinery.GroupVersionKind;
023import io.kubernetes.client.openapi.ApiException;
024import io.kubernetes.client.util.Watch;
025import java.io.IOException;
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.concurrent.ConcurrentHashMap;
031import org.jdrupes.vmoperator.common.Constants.Crd;
032import org.jdrupes.vmoperator.common.Constants.Status;
033import org.jdrupes.vmoperator.common.K8s;
034import org.jdrupes.vmoperator.common.K8sClient;
035import org.jdrupes.vmoperator.common.K8sDynamicModel;
036import org.jdrupes.vmoperator.common.K8sDynamicModels;
037import org.jdrupes.vmoperator.common.K8sDynamicStub;
038import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
039import org.jdrupes.vmoperator.common.VmDefinition.Assignment;
040import org.jdrupes.vmoperator.common.VmDefinitionStub;
041import org.jdrupes.vmoperator.common.VmPool;
042import org.jdrupes.vmoperator.manager.events.GetPools;
043import org.jdrupes.vmoperator.manager.events.VmPoolChanged;
044import org.jdrupes.vmoperator.manager.events.VmResourceChanged;
045import org.jdrupes.vmoperator.util.GsonPtr;
046import org.jgrapes.core.Channel;
047import org.jgrapes.core.EventPipeline;
048import org.jgrapes.core.annotation.Handler;
049import org.jgrapes.core.events.Attached;
050
051/**
052 * Watches for changes of VM pools. Reports the changes using 
053 * {@link VmPoolChanged} events fired on a special pipeline to
054 * avoid concurrent change informations.
055 */
056@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" })
057public class PoolMonitor extends
058        AbstractMonitor<K8sDynamicModel, K8sDynamicModels, Channel> {
059
060    private final Map<String, VmPool> pools = new ConcurrentHashMap<>();
061    private EventPipeline poolPipeline;
062
063    /**
064     * Instantiates a new VM pool manager.
065     *
066     * @param componentChannel the component channel
067     */
068    public PoolMonitor(Channel componentChannel) {
069        super(componentChannel, K8sDynamicModel.class,
070            K8sDynamicModels.class);
071    }
072
073    /**
074     * On attached.
075     *
076     * @param event the event
077     */
078    @Handler
079    @SuppressWarnings("PMD.CompareObjectsWithEquals")
080    public void onAttached(Attached event) {
081        if (event.node() == this) {
082            poolPipeline = newEventPipeline();
083        }
084    }
085
086    @Override
087    protected void prepareMonitoring() throws IOException, ApiException {
088        client(new K8sClient());
089
090        // Get all our API versions
091        var ctx = K8s.context(client(), Crd.GROUP, "", Crd.KIND_VM_POOL);
092        if (ctx.isEmpty()) {
093            logger.severe(() -> "Cannot get CRD context.");
094            return;
095        }
096        context(ctx.get());
097    }
098
099    @Override
100    protected void handleChange(K8sClient client,
101            Watch.Response<K8sDynamicModel> response) {
102
103        var type = ResponseType.valueOf(response.type);
104        var poolName = response.object.metadata().getName();
105
106        // When pool is deleted, save VMs in pending
107        if (type == ResponseType.DELETED) {
108            Optional.ofNullable(pools.get(poolName)).ifPresent(pool -> {
109                pool.setUndefined();
110                if (pool.vms().isEmpty()) {
111                    pools.remove(poolName);
112                }
113                poolPipeline.fire(new VmPoolChanged(pool, true));
114            });
115            return;
116        }
117
118        // Get full definition
119        var poolModel = response.object;
120        if (poolModel.data() == null) {
121            // ADDED event does not provide data, see
122            // https://github.com/kubernetes-client/java/issues/3215
123            try {
124                poolModel = K8sDynamicStub.get(client(), context(), namespace(),
125                    poolModel.metadata().getName()).model().orElse(null);
126            } catch (ApiException e) {
127                return;
128            }
129        }
130
131        // Get pool and merge changes
132        var vmPool = pools.computeIfAbsent(poolName, k -> new VmPool(poolName));
133        vmPool.defineFrom(client().getJSON().getGson().fromJson(
134            GsonPtr.to(poolModel.data()).to("spec").get(), VmPool.class));
135        poolPipeline.fire(new VmPoolChanged(vmPool));
136    }
137
138    /**
139     * Track VM definition changes.
140     *
141     * @param event the event
142     * @throws ApiException 
143     */
144    @Handler
145    public void onVmResourceChanged(VmResourceChanged event)
146            throws ApiException {
147        final var vmDef = event.vmDefinition();
148        final String vmName = vmDef.name();
149        switch (event.type()) {
150        case ADDED:
151            vmDef.<List<String>> fromSpec("pools")
152                .orElse(Collections.emptyList()).stream().forEach(p -> {
153                    pools.computeIfAbsent(p, k -> new VmPool(p))
154                        .vms().add(vmName);
155                    poolPipeline.fire(new VmPoolChanged(pools.get(p)));
156                });
157            break;
158        case DELETED:
159            pools.values().stream().forEach(p -> {
160                if (p.vms().remove(vmName)) {
161                    poolPipeline.fire(new VmPoolChanged(p));
162                }
163            });
164            return;
165        default:
166            break;
167        }
168
169        // Sync last usage to console state change if user matches
170        if (vmDef.assignment().map(Assignment::user)
171            .map(at -> at.equals(vmDef.consoleUser().orElse(null)))
172            .orElse(true)) {
173            return;
174        }
175
176        var ccChange = vmDef.condition("ConsoleConnected")
177            .map(cc -> cc.getLastTransitionTime().toInstant());
178        if (ccChange
179            .map(tt -> vmDef.assignment().map(Assignment::lastUsed)
180                .map(alu -> alu.isAfter(tt)).orElse(true))
181            .orElse(true)) {
182            return;
183        }
184        var vmStub = VmDefinitionStub.get(client(),
185            new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM),
186            vmDef.namespace(), vmDef.name());
187        vmStub.updateStatus(from -> {
188            // TODO
189            JsonObject status = from.statusJson();
190            var assignment = GsonPtr.to(status).to(Status.ASSIGNMENT);
191            assignment.set("lastUsed", ccChange.get().toString());
192            return status;
193        });
194    }
195
196    /**
197     * Return the requested pools.
198     *
199     * @param event the event
200     */
201    @Handler
202    public void onGetPools(GetPools event) {
203        event.setResult(pools.values().stream().filter(VmPool::isDefined)
204            .filter(p -> event.name().isEmpty()
205                || p.name().equals(event.name().get()))
206            .filter(p -> event.forUser().isEmpty() && event.forRoles().isEmpty()
207                || !p.permissionsFor(event.forUser().orElse(null),
208                    event.forRoles()).isEmpty())
209            .toList());
210    }
211}