File: Concurrent.hs

package info (click to toggle)
git-annex 7.20190129-3
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 56,292 kB
  • sloc: haskell: 59,105; sh: 1,255; makefile: 225; perl: 136; ansic: 44
file content (64 lines) | stat: -rw-r--r-- 1,817 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
{- git-annex concurrent state
 -
 - Copyright 2015 Joey Hess <id@joeyh.name>
 -
 - Licensed under the GNU GPL version 3 or higher.
 -}

module Annex.Concurrent where

import Annex
import Annex.Common
import Annex.Action
import qualified Annex.Queue

import qualified Data.Map as M

{- Allows forking off a thread that uses a copy of the current AnnexState
 - to run an Annex action.
 -
 - The returned IO action can be used to start the thread.
 - It returns an Annex action that must be run in the original 
 - calling context to merge the forked AnnexState back into the
 - current AnnexState.
 -}
forkState :: Annex a -> Annex (IO (Annex a))
forkState a = do
	st <- dupState
	return $ do
		(ret, newst) <- run st a
		return $ do
			mergeState newst
			return ret

{- Returns a copy of the current AnnexState that is safe to be
 - used when forking off a thread. 
 -
 - After an Annex action is run using this AnnexState, it
 - should be merged back into the current Annex's state,
 - by calling mergeState.
 -}
dupState :: Annex AnnexState
dupState = do
	st <- Annex.getState id
	return $ st
		{ Annex.workers = []
		-- each thread has its own repoqueue, but the repoqueuesem
		-- is shared to prevent more than one thread flushing its
		-- queue at the same time
		, Annex.repoqueue = Nothing
		-- avoid sharing eg, open file handles
		, Annex.catfilehandles = M.empty
		, Annex.checkattrhandle = Nothing
		, Annex.checkignorehandle = Nothing
		}

{- Merges the passed AnnexState into the current Annex state.
 - Also closes various handles in it. -}
mergeState :: AnnexState -> Annex ()
mergeState st = do
	st' <- liftIO $ snd <$> run st stopCoProcesses
	forM_ (M.toList $ Annex.cleanup st') $
		uncurry addCleanup
	Annex.Queue.mergeFrom st'
	changeState $ \s -> s { errcounter = errcounter s + errcounter st' }