File: LazyReplicatedMap.java

package info (click to toggle)
tomcat10 10.1.52-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 47,900 kB
  • sloc: java: 375,756; xml: 59,410; jsp: 4,741; sh: 1,381; perl: 324; makefile: 25; ansic: 14
file content (235 lines) | stat: -rw-r--r-- 11,087 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.catalina.tribes.tipis;

import java.io.Serializable;

import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.UniqueId;
import org.apache.catalina.tribes.util.Arrays;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;

/**
 * A smart implementation of a stateful replicated map. uses primary/secondary backup strategy. One node is always the
 * primary and one node is always the backup. This map is synchronized across a cluster, and only has one backup
 * member.<br>
 * A perfect usage for this map would be a session map for a session manager in a clustered environment.<br>
 * The only way to modify this list is to use the <code>put, putAll, remove</code> methods. entrySet, entrySetFull,
 * keySet, keySetFull, returns all non modifiable sets.<br>
 * <br>
 * If objects (values) in the map change without invoking <code>put()</code> or <code>remove()</code> the data can be
 * distributed using two different methods:<br>
 * <code>replicate(boolean)</code> and <code>replicate(Object, boolean)</code><br>
 * These two methods are very important two understand. The map can work with two set of value objects:<br>
 * 1. Serializable - the entire object gets serialized each time it is replicated<br>
 * 2. ReplicatedMapEntry - this interface allows for a isDirty() flag and to replicate diffs if desired.<br>
 * Implementing the <code>ReplicatedMapEntry</code> interface allows you to decide what objects get replicated and how
 * much data gets replicated each time.<br>
 * If you implement a smart AOP mechanism to detect changes in underlying objects, you can replicate only those changes
 * by implementing the ReplicatedMapEntry interface, and return true when isDiffable() is invoked.<br>
 * <br>
 * This map implementation doesn't have a background thread running to replicate changes. If you do have changes without
 * invoking put/remove then you need to invoke one of the following methods:
 * <ul>
 * <li><code>replicate(Object,boolean)</code> - replicates only the object that belongs to the key</li>
 * <li><code>replicate(boolean)</code> - Scans the entire map for changes and replicates data</li>
 * </ul>
 * the <code>boolean</code> value in the <code>replicate</code> method used to decide whether to only replicate objects
 * that implement the <code>ReplicatedMapEntry</code> interface or to replicate all objects. If an object doesn't
 * implement the <code>ReplicatedMapEntry</code> interface each time the object gets replicated the entire object gets
 * serialized, hence a call to <code>replicate(true)</code> will replicate all objects in this map that are using this
 * node as primary. <br>
 * <br>
 * <b>REMEMBER TO CALL</b> <code>breakdown()</code> when you are done with the map to avoid memory leaks.<br>
 * <br>
 * TODO implement periodic sync/transfer thread
 *
 * @param <K> The type of Key
 * @param <V> The type of Value
 */
public class LazyReplicatedMap<K, V> extends AbstractReplicatedMap<K,V> {
    private static final long serialVersionUID = 1L;
    // Lazy init to support serialization
    private transient volatile Log log;


    // ------------------------------------------------------------------------------
    // CONSTRUCTORS / DESTRUCTORS
    // ------------------------------------------------------------------------------
    /**
     * Creates a new map
     *
     * @param owner           The map owner
     * @param channel         The channel to use for communication
     * @param timeout         long - timeout for RPC messages
     * @param mapContextName  String - unique name for this map, to allow multiple maps per channel
     * @param initialCapacity int - the size of this map, see HashMap
     * @param loadFactor      float - load factor, see HashMap
     * @param cls             Class loaders
     */
    public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity,
            float loadFactor, ClassLoader[] cls) {
        super(owner, channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT, cls,
                true);
    }

    /**
     * Creates a new map
     *
     * @param owner           The map owner
     * @param channel         The channel to use for communication
     * @param timeout         long - timeout for RPC messages
     * @param mapContextName  String - unique name for this map, to allow multiple maps per channel
     * @param initialCapacity int - the size of this map, see HashMap
     * @param cls             Class loaders
     */
    public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity,
            ClassLoader[] cls) {
        super(owner, channel, timeout, mapContextName, initialCapacity, DEFAULT_LOAD_FACTOR,
                Channel.SEND_OPTIONS_DEFAULT, cls, true);
    }

    /**
     * Creates a new map
     *
     * @param owner          The map owner
     * @param channel        The channel to use for communication
     * @param timeout        long - timeout for RPC messages
     * @param mapContextName String - unique name for this map, to allow multiple maps per channel
     * @param cls            Class loaders
     */
    public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, ClassLoader[] cls) {
        super(owner, channel, timeout, mapContextName, DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR,
                Channel.SEND_OPTIONS_DEFAULT, cls, true);
    }

    /**
     * Creates a new map
     *
     * @param owner          The map owner
     * @param channel        The channel to use for communication
     * @param timeout        long - timeout for RPC messages
     * @param mapContextName String - unique name for this map, to allow multiple maps per channel
     * @param cls            Class loaders
     * @param terminate      boolean - Flag for whether to terminate this map that failed to start.
     */
    public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, ClassLoader[] cls,
            boolean terminate) {
        super(owner, channel, timeout, mapContextName, DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR,
                Channel.SEND_OPTIONS_DEFAULT, cls, terminate);
    }


    // ------------------------------------------------------------------------------
    // METHODS TO OVERRIDE
    // ------------------------------------------------------------------------------
    @Override
    protected int getStateMessageType() {
        return AbstractReplicatedMap.MapMessage.MSG_STATE;
    }

    @Override
    protected int getReplicateMessageType() {
        return AbstractReplicatedMap.MapMessage.MSG_BACKUP;
    }

    @Override
    protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException {
        Log log = getLog();
        if (!(key instanceof Serializable && value instanceof Serializable)) {
            return new Member[0];
        }
        Member[] members = getMapMembers();
        int firstIdx = getNextBackupIndex();
        int nextIdx = firstIdx;
        Member[] backup = new Member[0];

        // there are no backups
        if (members.length == 0 || firstIdx == -1) {
            return backup;
        }

        boolean success = false;
        do {
            // select a backup node
            Member next = members[nextIdx];

            // increment for the next round of back up selection
            nextIdx = nextIdx + 1;
            if (nextIdx >= members.length) {
                nextIdx = 0;
            }

            if (next == null) {
                continue;
            }
            MapMessage msg;
            try {
                Member[] tmpBackup = wrap(next);
                // publish the backup data to one node
                msg = new MapMessage(getMapContextName(), MapMessage.MSG_BACKUP, false, (Serializable) key,
                        (Serializable) value, null, channel.getLocalMember(false), tmpBackup);
                if (log.isTraceEnabled()) {
                    log.trace("Publishing backup data:" + msg + " to: " + next.getName());
                }
                UniqueId id = getChannel().send(tmpBackup, msg, getChannelSendOptions());
                if (log.isTraceEnabled()) {
                    log.trace("Data published:" + msg + " msg Id:" + id);
                }
                // we published out to a backup, mark the test success
                success = true;
                backup = tmpBackup;
            } catch (ChannelException x) {
                log.error(sm.getString("lazyReplicatedMap.unableReplicate.backup", key, next, x.getMessage()), x);
                continue;
            }
            try {
                // publish the data out to all nodes
                Member[] proxies = excludeFromSet(backup, getMapMembers());
                if (proxies.length > 0) {
                    msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false, (Serializable) key, null,
                            null, channel.getLocalMember(false), backup);
                    if (log.isTraceEnabled()) {
                        log.trace("Publishing proxy data:" + msg + " to: " + Arrays.toNameString(proxies));
                    }
                    getChannel().send(proxies, msg, getChannelSendOptions());
                }
            } catch (ChannelException x) {
                // log the error, but proceed, this should only happen if a node went down,
                // and if the node went down, then it can't receive the message, the others
                // should still get it.
                log.error(sm.getString("lazyReplicatedMap.unableReplicate.proxy", key, next, x.getMessage()), x);
            }
        } while (!success && (firstIdx != nextIdx));
        return backup;
    }


    private Log getLog() {
        if (log == null) {
            synchronized (this) {
                if (log == null) {
                    log = LogFactory.getLog(LazyReplicatedMap.class);
                }
            }
        }
        return log;
    }
}